Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2014 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # pylint: disable=g-bad-todo
     18 
     19 import errno
     20 import os
     21 import posix
     22 import random
     23 from socket import *  # pylint: disable=wildcard-import
     24 import struct
     25 import sys
     26 import threading
     27 import time
     28 import unittest
     29 
     30 from scapy import all as scapy
     31 
     32 import csocket
     33 import multinetwork_base
     34 import net_test
     35 
     36 
     37 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
     38 
     39 ICMP_ECHO = 8
     40 ICMP_ECHOREPLY = 0
     41 ICMPV6_ECHO_REQUEST = 128
     42 ICMPV6_ECHO_REPLY = 129
     43 IPV6_MIN_MTU = 1280
     44 ICMPV6_HEADER_LEN = 8
     45 ICMPV6_PKT_TOOBIG = 2
     46 
     47 
     48 class PingReplyThread(threading.Thread):
     49 
     50   MIN_TTL = 10
     51   INTERMEDIATE_IPV4 = "192.0.2.2"
     52   INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
     53   NEIGHBOURS = ["fe80::1"]
     54   LINK_MTU = 1300
     55 
     56   def __init__(self, tun, mymac, routermac, routeraddr):
     57     super(PingReplyThread, self).__init__()
     58     self._tun = tun
     59     self._started = False
     60     self._stopped = False
     61     self._mymac = mymac
     62     self._routermac = routermac
     63     self._routeraddr = routeraddr
     64 
     65   def IsStarted(self):
     66     return self._started
     67 
     68   def Stop(self):
     69     self._stopped = True
     70 
     71   def ChecksumValid(self, packet):
     72     # Get and clear the checksums.
     73     def GetAndClearChecksum(layer):
     74       if not layer:
     75         return
     76       try:
     77         checksum = layer.chksum
     78         del layer.chksum
     79       except AttributeError:
     80         checksum = layer.cksum
     81         del layer.cksum
     82       return checksum
     83 
     84     def GetChecksum(layer):
     85       try:
     86         return layer.chksum
     87       except AttributeError:
     88         return layer.cksum
     89 
     90     layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
     91     sums = {}
     92     for name in layers:
     93       sums[name] = GetAndClearChecksum(packet.getlayer(name))
     94 
     95     # Serialize the packet, so scapy recalculates the checksums, and compare
     96     # them with the ones in the packet.
     97     packet = packet.__class__(str(packet))
     98     for name in layers:
     99       layer = packet.getlayer(name)
    100       if layer and GetChecksum(layer) != sums[name]:
    101         return False
    102 
    103     return True
    104 
    105   def SendTimeExceeded(self, version, packet):
    106     if version == 4:
    107       src = packet.getlayer(scapy.IP).src
    108       self.SendPacket(
    109           scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
    110           scapy.ICMP(type=11, code=0) /
    111           packet)
    112     elif version == 6:
    113       src = packet.getlayer(scapy.IPv6).src
    114       self.SendPacket(
    115           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
    116           scapy.ICMPv6TimeExceeded(code=0) /
    117           packet)
    118 
    119   def SendPacketTooBig(self, packet):
    120       src = packet.getlayer(scapy.IPv6).src
    121       datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN
    122       self.SendPacket(
    123           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
    124           scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) /
    125           str(packet)[:datalen])
    126 
    127   def IPv4Packet(self, ip):
    128     icmp = ip.getlayer(scapy.ICMP)
    129 
    130     # We only support ping for now.
    131     if (ip.proto != IPPROTO_ICMP or
    132         icmp.type != ICMP_ECHO or
    133         icmp.code != 0):
    134       return
    135 
    136     # Check the checksums.
    137     if not self.ChecksumValid(ip):
    138       return
    139 
    140     if ip.ttl < self.MIN_TTL:
    141       self.SendTimeExceeded(4, ip)
    142       return
    143 
    144     icmp.type = ICMP_ECHOREPLY
    145     self.SwapAddresses(ip)
    146     self.SendPacket(ip)
    147 
    148   def IPv6Packet(self, ipv6):
    149     icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
    150 
    151     # We only support ping for now.
    152     if (ipv6.nh != IPPROTO_ICMPV6 or
    153         not icmpv6 or
    154         icmpv6.type != ICMPV6_ECHO_REQUEST or
    155         icmpv6.code != 0):
    156       return
    157 
    158     # Check the checksums.
    159     if not self.ChecksumValid(ipv6):
    160       return
    161 
    162     if ipv6.dst.startswith("ff02::"):
    163       ipv6.dst = ipv6.src
    164       for src in [self._routeraddr]:
    165         ipv6.src = src
    166         icmpv6.type = ICMPV6_ECHO_REPLY
    167         self.SendPacket(ipv6)
    168     elif ipv6.hlim < self.MIN_TTL:
    169       self.SendTimeExceeded(6, ipv6)
    170     elif ipv6.plen > self.LINK_MTU:
    171       self.SendPacketTooBig(ipv6)
    172     else:
    173       icmpv6.type = ICMPV6_ECHO_REPLY
    174       if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr:
    175         return
    176       self.SwapAddresses(ipv6)
    177       self.SendPacket(ipv6)
    178 
    179   def SwapAddresses(self, packet):
    180     src = packet.src
    181     packet.src = packet.dst
    182     packet.dst = src
    183 
    184   def SendPacket(self, packet):
    185     packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
    186     try:
    187       posix.write(self._tun.fileno(), str(packet))
    188     except Exception, e:
    189       if not self._stopped:
    190         raise e
    191 
    192   def run(self):
    193     self._started = True
    194     while not self._stopped:
    195       try:
    196         packet = posix.read(self._tun.fileno(), 4096)
    197       except OSError, e:
    198         if e.errno == errno.EAGAIN:
    199           continue
    200         else:
    201           break
    202       except ValueError, e:
    203         if not self._stopped:
    204           raise e
    205 
    206       ether = scapy.Ether(packet)
    207       if ether.type == net_test.ETH_P_IPV6:
    208         self.IPv6Packet(ether.payload)
    209       elif ether.type == net_test.ETH_P_IP:
    210         self.IPv4Packet(ether.payload)
    211 
    212 
    213 class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
    214 
    215   @classmethod
    216   def WaitForReplyThreads(cls):
    217     # Wait 2s for the reply threads to start. If they don't, don't blow up, as
    218     # that would cause tearDownClass not to be called and thus not clean up
    219     # routing configuration, breaking subsequent tests. Instead, just let these
    220     # tests fail.
    221     _INTERVAL = 0.1
    222     _ATTEMPTS = 20
    223     for i in xrange(0, _ATTEMPTS):
    224       for netid in cls.NETIDS:
    225         if all(thread.IsStarted() for thread in cls.reply_threads.values()):
    226           return
    227         time.sleep(_INTERVAL)
    228     msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
    229         _ATTEMPTS * _INTERVAL)
    230     sys.stderr.write(msg)
    231 
    232   @classmethod
    233   def StopReplyThreads(cls):
    234     for thread in cls.reply_threads.values():
    235       thread.Stop()
    236 
    237   @classmethod
    238   def setUpClass(cls):
    239     super(Ping6Test, cls).setUpClass()
    240     cls.reply_threads = {}
    241     for netid in cls.NETIDS:
    242       cls.reply_threads[netid] = PingReplyThread(
    243         cls.tuns[netid],
    244         cls.MyMacAddress(netid),
    245         cls.RouterMacAddress(netid),
    246         cls._RouterAddress(netid, 6))
    247       cls.reply_threads[netid].start()
    248     cls.WaitForReplyThreads()
    249     cls.netid = random.choice(cls.NETIDS)
    250     cls.SetDefaultNetwork(cls.netid)
    251 
    252   @classmethod
    253   def tearDownClass(cls):
    254     cls.StopReplyThreads()
    255     cls.ClearDefaultNetwork()
    256     super(Ping6Test, cls).tearDownClass()
    257 
    258   def setUp(self):
    259     self.ifname = self.GetInterfaceName(self.netid)
    260     self.ifindex = self.ifindices[self.netid]
    261     self.lladdr = net_test.GetLinkAddress(self.ifname, True)
    262     self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
    263 
    264   def assertValidPingResponse(self, s, data):
    265     family = s.family
    266 
    267     # Receive the reply.
    268     rcvd, src = s.recvfrom(32768)
    269     self.assertNotEqual(0, len(rcvd), "No data received")
    270 
    271     # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
    272     # as IPv4.
    273     if src[0].startswith("::ffff:"):
    274       family = AF_INET
    275       src = (src[0].replace("::ffff:", ""), src[1:])
    276 
    277     # Check the data being sent is valid.
    278     self.assertGreater(len(data), 7, "Not enough data for ping packet")
    279     if family == AF_INET:
    280       self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
    281     elif family == AF_INET6:
    282       self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
    283     else:
    284       self.fail("Unknown socket address family %d" * s.family)
    285 
    286     # Check address, ICMP type, and ICMP code.
    287     if family == AF_INET:
    288       addr, unused_port = src
    289       self.assertGreaterEqual(len(addr), len("1.1.1.1"))
    290       self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
    291     else:
    292       addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
    293       self.assertGreaterEqual(len(addr), len("::"))
    294       self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
    295       # Check that the flow label is zero and that the scope ID is sane.
    296       self.assertEqual(flowlabel, 0)
    297       if addr.startswith("fe80::"):
    298         self.assertTrue(scope_id in self.ifindices.values())
    299       else:
    300         self.assertEquals(0, scope_id)
    301 
    302     # TODO: check the checksum. We can't do this easily now for ICMPv6 because
    303     # we don't have the IP addresses so we can't construct the pseudoheader.
    304 
    305     # Check the sequence number and the data.
    306     self.assertEqual(len(data), len(rcvd))
    307     self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
    308 
    309   def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
    310                         txmem=0, rxmem=0):
    311     expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
    312                 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
    313                 "%02X" % state,
    314                 "%08X:%08X" % (txmem, rxmem),
    315                 str(os.getuid()), "2", "0"]
    316     actual = self.ReadProcNetSocket(name)[-1]
    317     self.assertListEqual(expected, actual)
    318 
    319   def testIPv4SendWithNoConnection(self):
    320     s = net_test.IPv4PingSocket()
    321     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
    322 
    323   def testIPv6SendWithNoConnection(self):
    324     s = net_test.IPv6PingSocket()
    325     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
    326 
    327   def testIPv4LoopbackPingWithConnect(self):
    328     s = net_test.IPv4PingSocket()
    329     s.connect(("127.0.0.1", 55))
    330     data = net_test.IPV4_PING + "foobarbaz"
    331     s.send(data)
    332     self.assertValidPingResponse(s, data)
    333 
    334   def testIPv6LoopbackPingWithConnect(self):
    335     s = net_test.IPv6PingSocket()
    336     s.connect(("::1", 55))
    337     s.send(net_test.IPV6_PING)
    338     self.assertValidPingResponse(s, net_test.IPV6_PING)
    339 
    340   def testIPv4PingUsingSendto(self):
    341     s = net_test.IPv4PingSocket()
    342     written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
    343     self.assertEquals(len(net_test.IPV4_PING), written)
    344     self.assertValidPingResponse(s, net_test.IPV4_PING)
    345 
    346   def testIPv6PingUsingSendto(self):
    347     s = net_test.IPv6PingSocket()
    348     written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    349     self.assertEquals(len(net_test.IPV6_PING), written)
    350     self.assertValidPingResponse(s, net_test.IPV6_PING)
    351 
    352   def testIPv4NoCrash(self):
    353     # Python 2.x does not provide either read() or recvmsg.
    354     s = net_test.IPv4PingSocket()
    355     written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
    356     self.assertEquals(len(net_test.IPV4_PING), written)
    357     fd = s.fileno()
    358     reply = posix.read(fd, 4096)
    359     self.assertEquals(written, len(reply))
    360 
    361   def testIPv6NoCrash(self):
    362     # Python 2.x does not provide either read() or recvmsg.
    363     s = net_test.IPv6PingSocket()
    364     written = s.sendto(net_test.IPV6_PING, ("::1", 55))
    365     self.assertEquals(len(net_test.IPV6_PING), written)
    366     fd = s.fileno()
    367     reply = posix.read(fd, 4096)
    368     self.assertEquals(written, len(reply))
    369 
    370   def testCrossProtocolCrash(self):
    371     # Checks that an ICMP error containing a ping packet that matches the ID
    372     # of a socket of the wrong protocol (which can happen when using 464xlat)
    373     # doesn't crash the kernel.
    374 
    375     # We can only test this using IPv6 unreachables and IPv4 ping sockets,
    376     # because IPv4 packets sent by scapy.send() on loopback are not received by
    377     # the kernel. So we don't actually use this function yet.
    378     def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
    379       return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
    380               scapy.ICMP(type=3, code=0) /
    381               scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
    382               scapy.ICMP(type=8, id=port, seq=1))
    383 
    384     def GetIPv6Unreachable(port):
    385       return (scapy.IPv6(src="::1", dst="::1") /
    386               scapy.ICMPv6DestUnreach() /
    387               scapy.IPv6(src="::1", dst="::1") /
    388               scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
    389 
    390     # An unreachable matching the ID of a socket of the wrong protocol
    391     # shouldn't crash.
    392     s = net_test.IPv4PingSocket()
    393     s.connect(("127.0.0.1", 12345))
    394     _, port = s.getsockname()
    395     scapy.send(GetIPv6Unreachable(port), verbose=False)
    396     # No crash? Good.
    397 
    398   def testCrossProtocolCalls(self):
    399     """Tests that passing in the wrong family returns EAFNOSUPPORT.
    400 
    401     Relevant kernel commits:
    402       upstream net:
    403         91a0b60 net/ping: handle protocol mismatching scenario
    404         9145736d net: ping: Return EAFNOSUPPORT when appropriate.
    405 
    406       android-3.10:
    407         78a6809 net/ping: handle protocol mismatching scenario
    408         428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
    409     """
    410 
    411     def CheckEAFNoSupport(function, *args):
    412       self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
    413 
    414     ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
    415 
    416     # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
    417     # IPv4 socket address structures, we need to pass down a socket address
    418     # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
    419     # will fail immediately with EINVAL because the passed-in socket length is
    420     # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
    421     ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
    422     ipv4sockaddr = csocket.SockaddrIn6(
    423         ipv4sockaddr.Pack() +
    424         "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
    425 
    426     s4 = net_test.IPv4PingSocket()
    427     s6 = net_test.IPv6PingSocket()
    428 
    429     # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
    430     # address family, because the Python implementation will just pass garbage
    431     # down to the kernel. So call the C functions directly.
    432     CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
    433     CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
    434     CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
    435     CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
    436     CheckEAFNoSupport(csocket.Sendmsg,
    437                       s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
    438     CheckEAFNoSupport(csocket.Sendmsg,
    439                       s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
    440 
    441   def testIPv4Bind(self):
    442     # Bind to unspecified address.
    443     s = net_test.IPv4PingSocket()
    444     s.bind(("0.0.0.0", 544))
    445     self.assertEquals(("0.0.0.0", 544), s.getsockname())
    446 
    447     # Bind to loopback.
    448     s = net_test.IPv4PingSocket()
    449     s.bind(("127.0.0.1", 99))
    450     self.assertEquals(("127.0.0.1", 99), s.getsockname())
    451 
    452     # Binding twice is not allowed.
    453     self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
    454 
    455     # But binding two different sockets to the same ID is allowed.
    456     s2 = net_test.IPv4PingSocket()
    457     s2.bind(("127.0.0.1", 99))
    458     self.assertEquals(("127.0.0.1", 99), s2.getsockname())
    459     s3 = net_test.IPv4PingSocket()
    460     s3.bind(("127.0.0.1", 99))
    461     self.assertEquals(("127.0.0.1", 99), s3.getsockname())
    462 
    463     # If two sockets bind to the same port, the first one to call read() gets
    464     # the response.
    465     s4 = net_test.IPv4PingSocket()
    466     s5 = net_test.IPv4PingSocket()
    467     s4.bind(("0.0.0.0", 167))
    468     s5.bind(("0.0.0.0", 167))
    469     s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
    470     self.assertValidPingResponse(s5, net_test.IPV4_PING)
    471     net_test.SetSocketTimeout(s4, 100)
    472     self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
    473 
    474     # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
    475     s6 = net_test.IPv4PingSocket()
    476     s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
    477     self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
    478 
    479     # Can't bind after sendto.
    480     s = net_test.IPv4PingSocket()
    481     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
    482     self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
    483 
    484   def testIPv6Bind(self):
    485     # Bind to unspecified address.
    486     s = net_test.IPv6PingSocket()
    487     s.bind(("::", 769))
    488     self.assertEquals(("::", 769, 0, 0), s.getsockname())
    489 
    490     # Bind to loopback.
    491     s = net_test.IPv6PingSocket()
    492     s.bind(("::1", 99))
    493     self.assertEquals(("::1", 99, 0, 0), s.getsockname())
    494 
    495     # Binding twice is not allowed.
    496     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
    497 
    498     # But binding two different sockets to the same ID is allowed.
    499     s2 = net_test.IPv6PingSocket()
    500     s2.bind(("::1", 99))
    501     self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
    502     s3 = net_test.IPv6PingSocket()
    503     s3.bind(("::1", 99))
    504     self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
    505 
    506     # Binding both IPv4 and IPv6 to the same socket works.
    507     s4 = net_test.IPv4PingSocket()
    508     s6 = net_test.IPv6PingSocket()
    509     s4.bind(("0.0.0.0", 444))
    510     s6.bind(("::", 666, 0, 0))
    511 
    512     # Can't bind after sendto.
    513     s = net_test.IPv6PingSocket()
    514     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
    515     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
    516 
    517   def testIPv4InvalidBind(self):
    518     s = net_test.IPv4PingSocket()
    519     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    520                            s.bind, ("255.255.255.255", 1026))
    521     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    522                            s.bind, ("224.0.0.1", 651))
    523     # Binding to an address we don't have only works with IP_TRANSPARENT.
    524     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    525                            s.bind, (net_test.IPV4_ADDR, 651))
    526     try:
    527       s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
    528       s.bind((net_test.IPV4_ADDR, 651))
    529     except IOError, e:
    530       if e.errno == errno.EACCES:
    531         pass  # We're not root. let it go for now.
    532 
    533   def testIPv6InvalidBind(self):
    534     s = net_test.IPv6PingSocket()
    535     self.assertRaisesErrno(errno.EINVAL,
    536                            s.bind, ("ff02::2", 1026))
    537 
    538     # Binding to an address we don't have only works with IPV6_TRANSPARENT.
    539     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    540                            s.bind, (net_test.IPV6_ADDR, 651))
    541     try:
    542       s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
    543       s.bind((net_test.IPV6_ADDR, 651))
    544     except IOError, e:
    545       if e.errno == errno.EACCES:
    546         pass  # We're not root. let it go for now.
    547 
    548   def testAfUnspecBind(self):
    549     # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
    550     s4 = net_test.IPv4PingSocket()
    551     sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
    552     sockaddr.family = AF_UNSPEC
    553     csocket.Bind(s4, sockaddr)
    554     self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
    555 
    556     # But not if the address is anything else.
    557     sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
    558     sockaddr.family = AF_UNSPEC
    559     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
    560 
    561     # This doesn't work for IPv6.
    562     s6 = net_test.IPv6PingSocket()
    563     sockaddr = csocket.Sockaddr(("::1", 58997))
    564     sockaddr.family = AF_UNSPEC
    565     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
    566 
    567   def testIPv6ScopedBind(self):
    568     # Can't bind to a link-local address without a scope ID.
    569     s = net_test.IPv6PingSocket()
    570     self.assertRaisesErrno(errno.EINVAL,
    571                            s.bind, (self.lladdr, 1026, 0, 0))
    572 
    573     # Binding to a link-local address with a scope ID works, and the scope ID is
    574     # returned by a subsequent getsockname. Interestingly, Python's getsockname
    575     # returns "fe80:1%foo", even though it does not understand it.
    576     expected = self.lladdr + "%" + self.ifname
    577     s.bind((self.lladdr, 4646, 0, self.ifindex))
    578     self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
    579 
    580     # Of course, for the above to work the address actually has to be configured
    581     # on the machine.
    582     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    583                            s.bind, ("fe80::f00", 1026, 0, 1))
    584 
    585     # Scope IDs on non-link-local addresses are silently ignored.
    586     s = net_test.IPv6PingSocket()
    587     s.bind(("::1", 1234, 0, 1))
    588     self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
    589 
    590   def testBindAffectsIdentifier(self):
    591     s = net_test.IPv6PingSocket()
    592     s.bind((self.globaladdr, 0xf976))
    593     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    594     self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
    595 
    596     s = net_test.IPv6PingSocket()
    597     s.bind((self.globaladdr, 0xace))
    598     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    599     self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
    600 
    601   def testLinkLocalAddress(self):
    602     s = net_test.IPv6PingSocket()
    603     # Sending to a link-local address with no scope fails with EINVAL.
    604     self.assertRaisesErrno(errno.EINVAL,
    605                            s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
    606     # Sending to link-local address with a scope succeeds. Note that Python
    607     # doesn't understand the "fe80::1%lo" format, even though it returns it.
    608     s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
    609     # No exceptions? Good.
    610 
    611   def testLinkLocalOif(self):
    612     """Checks that ping to link-local addresses works correctly.
    613 
    614     Relevant kernel commits:
    615       upstream net:
    616         5e45789 net: ipv6: Fix ping to link-local addresses.
    617     """
    618     s = net_test.IPv6PingSocket()
    619     for mode in ["oif", "ucast_oif", None]:
    620       s = net_test.IPv6PingSocket()
    621       for netid in self.NETIDS:
    622         dst = self._RouterAddress(netid, 6)
    623         self.assertTrue(dst.startswith("fe80:"))
    624 
    625         if mode:
    626           self.SelectInterface(s, netid, mode)
    627           scopeid = 0
    628         else:
    629           scopeid = self.ifindices[netid]
    630 
    631         if mode == "oif":
    632           # If SO_BINDTODEVICE has been set, any attempt to send on another
    633           # interface returns EINVAL.
    634           othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1)
    635                                    % len(self.NETIDS)]
    636           otherscopeid = self.ifindices[othernetid]
    637           self.assertRaisesErrno(
    638               errno.EINVAL,
    639               s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid))
    640 
    641         s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid))
    642         # If we got a reply, we sent the packet out on the right interface.
    643         self.assertValidPingResponse(s, net_test.IPV6_PING)
    644 
    645   def testMappedAddressFails(self):
    646     s = net_test.IPv6PingSocket()
    647     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    648     self.assertValidPingResponse(s, net_test.IPV6_PING)
    649     s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
    650     self.assertValidPingResponse(s, net_test.IPV6_PING)
    651     self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
    652                            ("::ffff:192.0.2.1", 55))
    653 
    654   @unittest.skipUnless(False, "skipping: does not work yet")
    655   def testFlowLabel(self):
    656     s = net_test.IPv6PingSocket()
    657 
    658     # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
    659     # the flow label in the packet is not set.
    660     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
    661     self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
    662 
    663     # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
    664     # that is not registered with the flow manager should return EINVAL...
    665     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
    666     # ... but this doesn't work yet.
    667     if False:
    668       self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
    669                              (net_test.IPV6_ADDR, 93, 0xdead, 0))
    670 
    671     # After registering the flow label, it gets sent properly, appears in the
    672     # output packet, and is returned in the response.
    673     net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
    674     self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
    675                                      net_test.IPV6_FLOWINFO_SEND))
    676     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
    677     _, src = s.recvfrom(32768)
    678     _, _, flowlabel, _ = src
    679     self.assertEqual(0xdead, flowlabel & 0xfffff)
    680 
    681   def testIPv4Error(self):
    682     s = net_test.IPv4PingSocket()
    683     s.setsockopt(SOL_IP, IP_TTL, 2)
    684     s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
    685     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
    686     # We can't check the actual error because Python 2.7 doesn't implement
    687     # recvmsg, but we can at least check that the socket returns an error.
    688     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
    689 
    690   def testIPv6Error(self):
    691     s = net_test.IPv6PingSocket()
    692     s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
    693     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
    694     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    695     # We can't check the actual error because Python 2.7 doesn't implement
    696     # recvmsg, but we can at least check that the socket returns an error.
    697     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
    698 
    699   def testIPv6MulticastPing(self):
    700     s = net_test.IPv6PingSocket()
    701     # Send a multicast ping and check we get at least one duplicate.
    702     # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
    703     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
    704     s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
    705     self.assertValidPingResponse(s, net_test.IPV6_PING)
    706     self.assertValidPingResponse(s, net_test.IPV6_PING)
    707 
    708   def testIPv4LargePacket(self):
    709     s = net_test.IPv4PingSocket()
    710     data = net_test.IPV4_PING + 20000 * "a"
    711     s.sendto(data, ("127.0.0.1", 987))
    712     self.assertValidPingResponse(s, data)
    713 
    714   def testIPv6LargePacket(self):
    715     s = net_test.IPv6PingSocket()
    716     s.bind(("::", 0xace))
    717     data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
    718     s.sendto(data, ("::1", 953))
    719 
    720   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    721   def testIcmpSocketsNotInIcmp6(self):
    722     numrows = len(self.ReadProcNetSocket("icmp"))
    723     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    724     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
    725     s.bind(("127.0.0.1", 0xace))
    726     s.connect(("127.0.0.1", 0xbeef))
    727     self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
    728     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
    729 
    730   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    731   def testIcmp6SocketsNotInIcmp(self):
    732     numrows = len(self.ReadProcNetSocket("icmp"))
    733     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    734     s = net_test.IPv6PingSocket()
    735     s.bind(("::1", 0xace))
    736     s.connect(("::1", 0xbeef))
    737     self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
    738     self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
    739 
    740   def testProcNetIcmp(self):
    741     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
    742     s.bind(("127.0.0.1", 0xace))
    743     s.connect(("127.0.0.1", 0xbeef))
    744     self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
    745 
    746   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    747   def testProcNetIcmp6(self):
    748     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    749     s = net_test.IPv6PingSocket()
    750     s.bind(("::1", 0xace))
    751     s.connect(("::1", 0xbeef))
    752     self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
    753 
    754     # Check the row goes away when the socket is closed.
    755     s.close()
    756     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
    757 
    758     # Try send, bind and connect to check the addresses and the state.
    759     s = net_test.IPv6PingSocket()
    760     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
    761     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
    762     self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
    763 
    764     # Can't bind after sendto, apparently.
    765     s = net_test.IPv6PingSocket()
    766     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
    767     s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
    768     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
    769 
    770     # Check receive bytes.
    771     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
    772     s.connect(("ff02::1", 0xdead))
    773     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
    774     s.send(net_test.IPV6_PING)
    775     s.recvfrom(32768, MSG_PEEK)  # Wait until the receive thread replies.
    776     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
    777                            txmem=0, rxmem=0x300)
    778     self.assertValidPingResponse(s, net_test.IPV6_PING)
    779     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
    780                            txmem=0, rxmem=0)
    781 
    782   def testProcNetUdp6(self):
    783     s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
    784     s.bind(("::1", 0xace))
    785     s.connect(("::1", 0xbeef))
    786     self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
    787 
    788   def testProcNetRaw6(self):
    789     s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
    790     s.bind(("::1", 0xace))
    791     s.connect(("::1", 0xbeef))
    792     self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
    793 
    794   def testIPv6MTU(self):
    795     """Tests IPV6_RECVERR and path MTU discovery on ping sockets.
    796 
    797     Relevant kernel commits:
    798       upstream net-next:
    799         dcb94b8 ipv6: fix endianness error in icmpv6_err
    800     """
    801     s = net_test.IPv6PingSocket()
    802     s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1)
    803     s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2)
    804     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
    805     s.connect((net_test.IPV6_ADDR, 55))
    806     pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a"
    807     s.send(pkt)
    808     self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768)
    809     data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE)
    810 
    811     # Compare the offending packet with the one we sent. To do this we need to
    812     # calculate the ident of the packet we sent and blank out the checksum of
    813     # the one we received.
    814     ident = struct.pack("!H", s.getsockname()[1])
    815     pkt = pkt[:4] + ident + pkt[6:]
    816     data = data[:2] + "\x00\x00" + pkt[4:]
    817     self.assertEquals(pkt, data)
    818 
    819     # Check the address that the packet was sent to.
    820     # ... except in 4.1, where it just returns an AF_UNSPEC, like this:
    821     # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC,
    822     #     sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"},
    823     #     msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}],
    824     #     msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...},
    825     #     msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232
    826     if net_test.LINUX_VERSION != (4, 1, 0):
    827       self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
    828 
    829     # Check the cmsg data, including the link MTU.
    830     mtu = PingReplyThread.LINK_MTU
    831     src = self.reply_threads[self.netid].INTERMEDIATE_IPV6
    832     msglist = [
    833         (net_test.SOL_IPV6, net_test.IPV6_RECVERR,
    834          (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6,
    835                                    ICMPV6_PKT_TOOBIG, 0, mtu, 0)),
    836           csocket.Sockaddr((src, 0))))
    837     ]
    838 
    839     # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port.
    840     # The fix might have been in 676d236, but we don't have that in 3.10 and it
    841     # touches code all over the tree. Instead, just don't check the port.
    842     if net_test.LINUX_VERSION <= (3, 14, 0):
    843       msglist[0][2][1].port = cmsg[0][2][1].port
    844 
    845     self.assertEquals(msglist, cmsg)
    846 
    847 
    848 if __name__ == "__main__":
    849   unittest.main()
    850