Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2014 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # pylint: disable=g-bad-todo
     18 
     19 import errno
     20 import os
     21 import posix
     22 import random
     23 from socket import *  # pylint: disable=wildcard-import
     24 import struct
     25 import sys
     26 import threading
     27 import time
     28 import unittest
     29 
     30 from scapy import all as scapy
     31 
     32 import csocket
     33 import multinetwork_base
     34 import net_test
     35 
     36 
     37 HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
     38 
     39 ICMP_ECHO = 8
     40 ICMP_ECHOREPLY = 0
     41 ICMPV6_ECHO_REQUEST = 128
     42 ICMPV6_ECHO_REPLY = 129
     43 IPV6_MIN_MTU = 1280
     44 ICMPV6_HEADER_LEN = 8
     45 ICMPV6_PKT_TOOBIG = 2
     46 
     47 
     48 class PingReplyThread(threading.Thread):
     49 
     50   MIN_TTL = 10
     51   INTERMEDIATE_IPV4 = "192.0.2.2"
     52   INTERMEDIATE_IPV6 = "2001:db8:1:2::ace:d00d"
     53   NEIGHBOURS = ["fe80::1"]
     54   LINK_MTU = 1300
     55 
     56   def __init__(self, tun, mymac, routermac, routeraddr):
     57     super(PingReplyThread, self).__init__()
     58     self._tun = tun
     59     self._started = False
     60     self._stopped = False
     61     self._mymac = mymac
     62     self._routermac = routermac
     63     self._routeraddr = routeraddr
     64 
     65   def IsStarted(self):
     66     return self._started
     67 
     68   def Stop(self):
     69     self._stopped = True
     70 
     71   def ChecksumValid(self, packet):
     72     # Get and clear the checksums.
     73     def GetAndClearChecksum(layer):
     74       if not layer:
     75         return
     76       try:
     77         checksum = layer.chksum
     78         del layer.chksum
     79       except AttributeError:
     80         checksum = layer.cksum
     81         del layer.cksum
     82       return checksum
     83 
     84     def GetChecksum(layer):
     85       try:
     86         return layer.chksum
     87       except AttributeError:
     88         return layer.cksum
     89 
     90     layers = ["IP", "ICMP", scapy.ICMPv6EchoRequest]
     91     sums = {}
     92     for name in layers:
     93       sums[name] = GetAndClearChecksum(packet.getlayer(name))
     94 
     95     # Serialize the packet, so scapy recalculates the checksums, and compare
     96     # them with the ones in the packet.
     97     packet = packet.__class__(str(packet))
     98     for name in layers:
     99       layer = packet.getlayer(name)
    100       if layer and GetChecksum(layer) != sums[name]:
    101         return False
    102 
    103     return True
    104 
    105   def SendTimeExceeded(self, version, packet):
    106     if version == 4:
    107       src = packet.getlayer(scapy.IP).src
    108       self.SendPacket(
    109           scapy.IP(src=self.INTERMEDIATE_IPV4, dst=src) /
    110           scapy.ICMP(type=11, code=0) /
    111           packet)
    112     elif version == 6:
    113       src = packet.getlayer(scapy.IPv6).src
    114       self.SendPacket(
    115           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
    116           scapy.ICMPv6TimeExceeded(code=0) /
    117           packet)
    118 
    119   def SendPacketTooBig(self, packet):
    120       src = packet.getlayer(scapy.IPv6).src
    121       datalen = IPV6_MIN_MTU - ICMPV6_HEADER_LEN
    122       self.SendPacket(
    123           scapy.IPv6(src=self.INTERMEDIATE_IPV6, dst=src) /
    124           scapy.ICMPv6PacketTooBig(mtu=self.LINK_MTU) /
    125           str(packet)[:datalen])
    126 
    127   def IPv4Packet(self, ip):
    128     icmp = ip.getlayer(scapy.ICMP)
    129 
    130     # We only support ping for now.
    131     if (ip.proto != IPPROTO_ICMP or
    132         icmp.type != ICMP_ECHO or
    133         icmp.code != 0):
    134       return
    135 
    136     # Check the checksums.
    137     if not self.ChecksumValid(ip):
    138       return
    139 
    140     if ip.ttl < self.MIN_TTL:
    141       self.SendTimeExceeded(4, ip)
    142       return
    143 
    144     icmp.type = ICMP_ECHOREPLY
    145     self.SwapAddresses(ip)
    146     self.SendPacket(ip)
    147 
    148   def IPv6Packet(self, ipv6):
    149     icmpv6 = ipv6.getlayer(scapy.ICMPv6EchoRequest)
    150 
    151     # We only support ping for now.
    152     if (ipv6.nh != IPPROTO_ICMPV6 or
    153         not icmpv6 or
    154         icmpv6.type != ICMPV6_ECHO_REQUEST or
    155         icmpv6.code != 0):
    156       return
    157 
    158     # Check the checksums.
    159     if not self.ChecksumValid(ipv6):
    160       return
    161 
    162     if ipv6.dst.startswith("ff02::"):
    163       ipv6.dst = ipv6.src
    164       for src in [self._routeraddr]:
    165         ipv6.src = src
    166         icmpv6.type = ICMPV6_ECHO_REPLY
    167         self.SendPacket(ipv6)
    168     elif ipv6.hlim < self.MIN_TTL:
    169       self.SendTimeExceeded(6, ipv6)
    170     elif ipv6.plen > self.LINK_MTU:
    171       self.SendPacketTooBig(ipv6)
    172     else:
    173       icmpv6.type = ICMPV6_ECHO_REPLY
    174       if ipv6.dst.startswith("fe80:") and ipv6.dst != self._routeraddr:
    175         return
    176       self.SwapAddresses(ipv6)
    177       self.SendPacket(ipv6)
    178 
    179   def SwapAddresses(self, packet):
    180     src = packet.src
    181     packet.src = packet.dst
    182     packet.dst = src
    183 
    184   def SendPacket(self, packet):
    185     packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
    186     try:
    187       posix.write(self._tun.fileno(), str(packet))
    188     except Exception, e:
    189       if not self._stopped:
    190         raise e
    191 
    192   def run(self):
    193     self._started = True
    194     while not self._stopped:
    195       try:
    196         packet = posix.read(self._tun.fileno(), 4096)
    197       except OSError, e:
    198         if e.errno == errno.EAGAIN:
    199           continue
    200         else:
    201           break
    202       except ValueError, e:
    203         if not self._stopped:
    204           raise e
    205 
    206       ether = scapy.Ether(packet)
    207       if ether.type == net_test.ETH_P_IPV6:
    208         self.IPv6Packet(ether.payload)
    209       elif ether.type == net_test.ETH_P_IP:
    210         self.IPv4Packet(ether.payload)
    211 
    212 
    213 class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
    214 
    215   @classmethod
    216   def WaitForReplyThreads(cls):
    217     # Wait 2s for the reply threads to start. If they don't, don't blow up, as
    218     # that would cause tearDownClass not to be called and thus not clean up
    219     # routing configuration, breaking subsequent tests. Instead, just let these
    220     # tests fail.
    221     _INTERVAL = 0.1
    222     _ATTEMPTS = 20
    223     for i in xrange(0, _ATTEMPTS):
    224       for netid in cls.NETIDS:
    225         if all(thread.IsStarted() for thread in cls.reply_threads.values()):
    226           return
    227         time.sleep(_INTERVAL)
    228     msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
    229         _ATTEMPTS * _INTERVAL)
    230     sys.stderr.write(msg)
    231 
    232   @classmethod
    233   def StopReplyThreads(cls):
    234     for thread in cls.reply_threads.values():
    235       thread.Stop()
    236 
    237   @classmethod
    238   def setUpClass(cls):
    239     super(Ping6Test, cls).setUpClass()
    240     cls.reply_threads = {}
    241     for netid in cls.NETIDS:
    242       cls.reply_threads[netid] = PingReplyThread(
    243         cls.tuns[netid],
    244         cls.MyMacAddress(netid),
    245         cls.RouterMacAddress(netid),
    246         cls._RouterAddress(netid, 6))
    247       cls.reply_threads[netid].start()
    248     cls.WaitForReplyThreads()
    249     cls.netid = random.choice(cls.NETIDS)
    250     cls.SetDefaultNetwork(cls.netid)
    251 
    252   @classmethod
    253   def tearDownClass(cls):
    254     cls.StopReplyThreads()
    255     cls.ClearDefaultNetwork()
    256     super(Ping6Test, cls).tearDownClass()
    257 
    258   def setUp(self):
    259     self.ifname = self.GetInterfaceName(self.netid)
    260     self.ifindex = self.ifindices[self.netid]
    261     self.lladdr = net_test.GetLinkAddress(self.ifname, True)
    262     self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
    263 
    264   def assertValidPingResponse(self, s, data):
    265     family = s.family
    266 
    267     # Receive the reply.
    268     rcvd, src = s.recvfrom(32768)
    269     self.assertNotEqual(0, len(rcvd), "No data received")
    270 
    271     # If this is a dual-stack socket sending to a mapped IPv4 address, treat it
    272     # as IPv4.
    273     if src[0].startswith("::ffff:"):
    274       family = AF_INET
    275       src = (src[0].replace("::ffff:", ""), src[1:])
    276 
    277     # Check the data being sent is valid.
    278     self.assertGreater(len(data), 7, "Not enough data for ping packet")
    279     if family == AF_INET:
    280       self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request")
    281     elif family == AF_INET6:
    282       self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request")
    283     else:
    284       self.fail("Unknown socket address family %d" * s.family)
    285 
    286     # Check address, ICMP type, and ICMP code.
    287     if family == AF_INET:
    288       addr, unused_port = src
    289       self.assertGreaterEqual(len(addr), len("1.1.1.1"))
    290       self.assertTrue(rcvd.startswith("\x00\x00"), "Not an IPv4 echo reply")
    291     else:
    292       addr, unused_port, flowlabel, scope_id = src  # pylint: disable=unbalanced-tuple-unpacking
    293       self.assertGreaterEqual(len(addr), len("::"))
    294       self.assertTrue(rcvd.startswith("\x81\x00"), "Not an IPv6 echo reply")
    295       # Check that the flow label is zero and that the scope ID is sane.
    296       self.assertEqual(flowlabel, 0)
    297       if addr.startswith("fe80::"):
    298         self.assertTrue(scope_id in self.ifindices.values())
    299       else:
    300         self.assertEquals(0, scope_id)
    301 
    302     # TODO: check the checksum. We can't do this easily now for ICMPv6 because
    303     # we don't have the IP addresses so we can't construct the pseudoheader.
    304 
    305     # Check the sequence number and the data.
    306     self.assertEqual(len(data), len(rcvd))
    307     self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
    308 
    309   def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
    310                         txmem=0, rxmem=0):
    311     expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
    312                 "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
    313                 "%02X" % state,
    314                 "%08X:%08X" % (txmem, rxmem),
    315                 str(os.getuid()), "2", "0"]
    316     actual = self.ReadProcNetSocket(name)[-1]
    317     self.assertListEqual(expected, actual)
    318 
    319   def testIPv4SendWithNoConnection(self):
    320     s = net_test.IPv4PingSocket()
    321     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
    322 
    323   def testIPv6SendWithNoConnection(self):
    324     s = net_test.IPv6PingSocket()
    325     self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
    326 
    327   def testIPv4LoopbackPingWithConnect(self):
    328     s = net_test.IPv4PingSocket()
    329     s.connect(("127.0.0.1", 55))
    330     data = net_test.IPV4_PING + "foobarbaz"
    331     s.send(data)
    332     self.assertValidPingResponse(s, data)
    333 
    334   def testIPv6LoopbackPingWithConnect(self):
    335     s = net_test.IPv6PingSocket()
    336     s.connect(("::1", 55))
    337     s.send(net_test.IPV6_PING)
    338     self.assertValidPingResponse(s, net_test.IPV6_PING)
    339 
    340   def testIPv4PingUsingSendto(self):
    341     s = net_test.IPv4PingSocket()
    342     written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
    343     self.assertEquals(len(net_test.IPV4_PING), written)
    344     self.assertValidPingResponse(s, net_test.IPV4_PING)
    345 
    346   def testIPv6PingUsingSendto(self):
    347     s = net_test.IPv6PingSocket()
    348     written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    349     self.assertEquals(len(net_test.IPV6_PING), written)
    350     self.assertValidPingResponse(s, net_test.IPV6_PING)
    351 
    352   def testIPv4NoCrash(self):
    353     # Python 2.x does not provide either read() or recvmsg.
    354     s = net_test.IPv4PingSocket()
    355     written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
    356     self.assertEquals(len(net_test.IPV4_PING), written)
    357     fd = s.fileno()
    358     reply = posix.read(fd, 4096)
    359     self.assertEquals(written, len(reply))
    360 
    361   def testIPv6NoCrash(self):
    362     # Python 2.x does not provide either read() or recvmsg.
    363     s = net_test.IPv6PingSocket()
    364     written = s.sendto(net_test.IPV6_PING, ("::1", 55))
    365     self.assertEquals(len(net_test.IPV6_PING), written)
    366     fd = s.fileno()
    367     reply = posix.read(fd, 4096)
    368     self.assertEquals(written, len(reply))
    369 
    370   def testCrossProtocolCrash(self):
    371     # Checks that an ICMP error containing a ping packet that matches the ID
    372     # of a socket of the wrong protocol (which can happen when using 464xlat)
    373     # doesn't crash the kernel.
    374 
    375     # We can only test this using IPv6 unreachables and IPv4 ping sockets,
    376     # because IPv4 packets sent by scapy.send() on loopback are not received by
    377     # the kernel. So we don't actually use this function yet.
    378     def GetIPv4Unreachable(port):  # pylint: disable=unused-variable
    379       return (scapy.IP(src="192.0.2.1", dst="127.0.0.1") /
    380               scapy.ICMP(type=3, code=0) /
    381               scapy.IP(src="127.0.0.1", dst="127.0.0.1") /
    382               scapy.ICMP(type=8, id=port, seq=1))
    383 
    384     def GetIPv6Unreachable(port):
    385       return (scapy.IPv6(src="::1", dst="::1") /
    386               scapy.ICMPv6DestUnreach() /
    387               scapy.IPv6(src="::1", dst="::1") /
    388               scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz"))
    389 
    390     # An unreachable matching the ID of a socket of the wrong protocol
    391     # shouldn't crash.
    392     s = net_test.IPv4PingSocket()
    393     s.connect(("127.0.0.1", 12345))
    394     _, port = s.getsockname()
    395     scapy.send(GetIPv6Unreachable(port), verbose=False)
    396     # No crash? Good.
    397 
    398   def testCrossProtocolCalls(self):
    399     """Tests that passing in the wrong family returns EAFNOSUPPORT.
    400 
    401     Relevant kernel commits:
    402       upstream net:
    403         91a0b60 net/ping: handle protocol mismatching scenario
    404         9145736d net: ping: Return EAFNOSUPPORT when appropriate.
    405 
    406       android-3.10:
    407         78a6809 net/ping: handle protocol mismatching scenario
    408         428e6d6 net: ping: Return EAFNOSUPPORT when appropriate.
    409     """
    410 
    411     def CheckEAFNoSupport(function, *args):
    412       self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
    413 
    414     ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
    415 
    416     # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
    417     # IPv4 socket address structures, we need to pass down a socket address
    418     # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
    419     # will fail immediately with EINVAL because the passed-in socket length is
    420     # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
    421     ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
    422     ipv4sockaddr = csocket.SockaddrIn6(
    423         ipv4sockaddr.Pack() +
    424         "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
    425 
    426     s4 = net_test.IPv4PingSocket()
    427     s6 = net_test.IPv6PingSocket()
    428 
    429     # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
    430     # address family, because the Python implementation will just pass garbage
    431     # down to the kernel. So call the C functions directly.
    432     CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
    433     CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
    434     CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
    435     CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
    436     CheckEAFNoSupport(csocket.Sendmsg,
    437                       s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
    438     CheckEAFNoSupport(csocket.Sendmsg,
    439                       s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
    440 
    441   def testIPv4Bind(self):
    442     # Bind to unspecified address.
    443     s = net_test.IPv4PingSocket()
    444     s.bind(("0.0.0.0", 544))
    445     self.assertEquals(("0.0.0.0", 544), s.getsockname())
    446 
    447     # Bind to loopback.
    448     s = net_test.IPv4PingSocket()
    449     s.bind(("127.0.0.1", 99))
    450     self.assertEquals(("127.0.0.1", 99), s.getsockname())
    451 
    452     # Binding twice is not allowed.
    453     self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
    454 
    455     # But binding two different sockets to the same ID is allowed.
    456     s2 = net_test.IPv4PingSocket()
    457     s2.bind(("127.0.0.1", 99))
    458     self.assertEquals(("127.0.0.1", 99), s2.getsockname())
    459     s3 = net_test.IPv4PingSocket()
    460     s3.bind(("127.0.0.1", 99))
    461     self.assertEquals(("127.0.0.1", 99), s3.getsockname())
    462 
    463     # If two sockets bind to the same port, the first one to call read() gets
    464     # the response.
    465     s4 = net_test.IPv4PingSocket()
    466     s5 = net_test.IPv4PingSocket()
    467     s4.bind(("0.0.0.0", 167))
    468     s5.bind(("0.0.0.0", 167))
    469     s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
    470     self.assertValidPingResponse(s5, net_test.IPV4_PING)
    471     net_test.SetSocketTimeout(s4, 100)
    472     self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
    473 
    474     # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
    475     s6 = net_test.IPv4PingSocket()
    476     s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
    477     self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
    478 
    479     # Can't bind after sendto.
    480     s = net_test.IPv4PingSocket()
    481     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
    482     self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
    483 
    484   def testIPv6Bind(self):
    485     # Bind to unspecified address.
    486     s = net_test.IPv6PingSocket()
    487     s.bind(("::", 769))
    488     self.assertEquals(("::", 769, 0, 0), s.getsockname())
    489 
    490     # Bind to loopback.
    491     s = net_test.IPv6PingSocket()
    492     s.bind(("::1", 99))
    493     self.assertEquals(("::1", 99, 0, 0), s.getsockname())
    494 
    495     # Binding twice is not allowed.
    496     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
    497 
    498     # But binding two different sockets to the same ID is allowed.
    499     s2 = net_test.IPv6PingSocket()
    500     s2.bind(("::1", 99))
    501     self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
    502     s3 = net_test.IPv6PingSocket()
    503     s3.bind(("::1", 99))
    504     self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
    505 
    506     # Binding both IPv4 and IPv6 to the same socket works.
    507     s4 = net_test.IPv4PingSocket()
    508     s6 = net_test.IPv6PingSocket()
    509     s4.bind(("0.0.0.0", 444))
    510     s6.bind(("::", 666, 0, 0))
    511 
    512     # Can't bind after sendto.
    513     s = net_test.IPv6PingSocket()
    514     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
    515     self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
    516 
    517   def testIPv4InvalidBind(self):
    518     s = net_test.IPv4PingSocket()
    519     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    520                            s.bind, ("255.255.255.255", 1026))
    521     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    522                            s.bind, ("224.0.0.1", 651))
    523     # Binding to an address we don't have only works with IP_TRANSPARENT.
    524     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    525                            s.bind, (net_test.IPV4_ADDR, 651))
    526     try:
    527       s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
    528       s.bind((net_test.IPV4_ADDR, 651))
    529     except IOError, e:
    530       if e.errno == errno.EACCES:
    531         pass  # We're not root. let it go for now.
    532 
    533   def testIPv6InvalidBind(self):
    534     s = net_test.IPv6PingSocket()
    535     self.assertRaisesErrno(errno.EINVAL,
    536                            s.bind, ("ff02::2", 1026))
    537 
    538     # Binding to an address we don't have only works with IPV6_TRANSPARENT.
    539     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    540                            s.bind, (net_test.IPV6_ADDR, 651))
    541     try:
    542       s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
    543       s.bind((net_test.IPV6_ADDR, 651))
    544     except IOError, e:
    545       if e.errno == errno.EACCES:
    546         pass  # We're not root. let it go for now.
    547 
    548   def testAfUnspecBind(self):
    549     # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
    550     s4 = net_test.IPv4PingSocket()
    551     sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
    552     sockaddr.family = AF_UNSPEC
    553     csocket.Bind(s4, sockaddr)
    554     self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
    555 
    556     # But not if the address is anything else.
    557     sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
    558     sockaddr.family = AF_UNSPEC
    559     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
    560 
    561     # This doesn't work for IPv6.
    562     s6 = net_test.IPv6PingSocket()
    563     sockaddr = csocket.Sockaddr(("::1", 58997))
    564     sockaddr.family = AF_UNSPEC
    565     self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
    566 
    567   def testIPv6ScopedBind(self):
    568     # Can't bind to a link-local address without a scope ID.
    569     s = net_test.IPv6PingSocket()
    570     self.assertRaisesErrno(errno.EINVAL,
    571                            s.bind, (self.lladdr, 1026, 0, 0))
    572 
    573     # Binding to a link-local address with a scope ID works, and the scope ID is
    574     # returned by a subsequent getsockname. Interestingly, Python's getsockname
    575     # returns "fe80:1%foo", even though it does not understand it.
    576     expected = self.lladdr + "%" + self.ifname
    577     s.bind((self.lladdr, 4646, 0, self.ifindex))
    578     self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
    579 
    580     # Of course, for the above to work the address actually has to be configured
    581     # on the machine.
    582     self.assertRaisesErrno(errno.EADDRNOTAVAIL,
    583                            s.bind, ("fe80::f00", 1026, 0, 1))
    584 
    585     # Scope IDs on non-link-local addresses are silently ignored.
    586     s = net_test.IPv6PingSocket()
    587     s.bind(("::1", 1234, 0, 1))
    588     self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
    589 
    590   def testBindAffectsIdentifier(self):
    591     s = net_test.IPv6PingSocket()
    592     s.bind((self.globaladdr, 0xf976))
    593     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    594     self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
    595 
    596     s = net_test.IPv6PingSocket()
    597     s.bind((self.globaladdr, 0xace))
    598     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    599     self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
    600 
    601   def testLinkLocalAddress(self):
    602     s = net_test.IPv6PingSocket()
    603     # Sending to a link-local address with no scope fails with EINVAL.
    604     self.assertRaisesErrno(errno.EINVAL,
    605                            s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
    606     # Sending to link-local address with a scope succeeds. Note that Python
    607     # doesn't understand the "fe80::1%lo" format, even though it returns it.
    608     s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
    609     # No exceptions? Good.
    610 
    611   def testLinkLocalOif(self):
    612     """Checks that ping to link-local addresses works correctly.
    613 
    614     Relevant kernel commits:
    615       upstream net:
    616         5e45789 net: ipv6: Fix ping to link-local addresses.
    617     """
    618     for mode in ["oif", "ucast_oif", None]:
    619       s = net_test.IPv6PingSocket()
    620       for netid in self.NETIDS:
    621         s2 = net_test.IPv6PingSocket()
    622         dst = self._RouterAddress(netid, 6)
    623         self.assertTrue(dst.startswith("fe80:"))
    624 
    625         if mode:
    626           self.SelectInterface(s, netid, mode)
    627           self.SelectInterface(s2, netid, mode)
    628           scopeid = 0
    629         else:
    630           scopeid = self.ifindices[netid]
    631 
    632         if mode == "oif":
    633           # If SO_BINDTODEVICE has been set, any attempt to send on another
    634           # interface returns EINVAL.
    635           othernetid = self.NETIDS[(self.NETIDS.index(netid) + 1)
    636                                    % len(self.NETIDS)]
    637           otherscopeid = self.ifindices[othernetid]
    638           self.assertRaisesErrno(
    639               errno.EINVAL,
    640               s.sendto, net_test.IPV6_PING, (dst, 55, 0, otherscopeid))
    641           self.assertRaisesErrno(
    642               errno.EINVAL,
    643               s.connect, (dst, 55, 0, otherscopeid))
    644 
    645         # Try using both sendto and connect/send.
    646         # If we get a reply, we sent the packet out on the right interface.
    647         s.sendto(net_test.IPV6_PING, (dst, 123, 0, scopeid))
    648         self.assertValidPingResponse(s, net_test.IPV6_PING)
    649 
    650         # IPV6_UNICAST_IF doesn't work on connected sockets.
    651         if mode != "ucast_oif":
    652           s2.connect((dst, 123, 0, scopeid))
    653           s2.send(net_test.IPV6_PING)
    654           self.assertValidPingResponse(s2, net_test.IPV6_PING)
    655 
    656   def testMappedAddressFails(self):
    657     s = net_test.IPv6PingSocket()
    658     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    659     self.assertValidPingResponse(s, net_test.IPV6_PING)
    660     s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
    661     self.assertValidPingResponse(s, net_test.IPV6_PING)
    662     self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
    663                            ("::ffff:192.0.2.1", 55))
    664 
    665   @unittest.skipUnless(False, "skipping: does not work yet")
    666   def testFlowLabel(self):
    667     s = net_test.IPv6PingSocket()
    668 
    669     # Specifying a flowlabel without having set IPV6_FLOWINFO_SEND succeeds but
    670     # the flow label in the packet is not set.
    671     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
    672     self.assertValidPingResponse(s, net_test.IPV6_PING)  # Checks flow label==0.
    673 
    674     # If IPV6_FLOWINFO_SEND is set on the socket, attempting to set a flow label
    675     # that is not registered with the flow manager should return EINVAL...
    676     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
    677     # ... but this doesn't work yet.
    678     if False:
    679       self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
    680                              (net_test.IPV6_ADDR, 93, 0xdead, 0))
    681 
    682     # After registering the flow label, it gets sent properly, appears in the
    683     # output packet, and is returned in the response.
    684     net_test.SetFlowLabel(s, net_test.IPV6_ADDR, 0xdead)
    685     self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6,
    686                                      net_test.IPV6_FLOWINFO_SEND))
    687     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
    688     _, src = s.recvfrom(32768)
    689     _, _, flowlabel, _ = src
    690     self.assertEqual(0xdead, flowlabel & 0xfffff)
    691 
    692   def testIPv4Error(self):
    693     s = net_test.IPv4PingSocket()
    694     s.setsockopt(SOL_IP, IP_TTL, 2)
    695     s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
    696     s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
    697     # We can't check the actual error because Python 2.7 doesn't implement
    698     # recvmsg, but we can at least check that the socket returns an error.
    699     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
    700 
    701   def testIPv6Error(self):
    702     s = net_test.IPv6PingSocket()
    703     s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
    704     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
    705     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
    706     # We can't check the actual error because Python 2.7 doesn't implement
    707     # recvmsg, but we can at least check that the socket returns an error.
    708     self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768)  # No response.
    709 
    710   def testIPv6MulticastPing(self):
    711     s = net_test.IPv6PingSocket()
    712     # Send a multicast ping and check we get at least one duplicate.
    713     # The setsockopt should not be necessary, but ping_v6_sendmsg has a bug.
    714     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
    715     s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
    716     self.assertValidPingResponse(s, net_test.IPV6_PING)
    717     self.assertValidPingResponse(s, net_test.IPV6_PING)
    718 
    719   def testIPv4LargePacket(self):
    720     s = net_test.IPv4PingSocket()
    721     data = net_test.IPV4_PING + 20000 * "a"
    722     s.sendto(data, ("127.0.0.1", 987))
    723     self.assertValidPingResponse(s, data)
    724 
    725   def testIPv6LargePacket(self):
    726     s = net_test.IPv6PingSocket()
    727     s.bind(("::", 0xace))
    728     data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
    729     s.sendto(data, ("::1", 953))
    730 
    731   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    732   def testIcmpSocketsNotInIcmp6(self):
    733     numrows = len(self.ReadProcNetSocket("icmp"))
    734     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    735     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
    736     s.bind(("127.0.0.1", 0xace))
    737     s.connect(("127.0.0.1", 0xbeef))
    738     self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
    739     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
    740 
    741   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    742   def testIcmp6SocketsNotInIcmp(self):
    743     numrows = len(self.ReadProcNetSocket("icmp"))
    744     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    745     s = net_test.IPv6PingSocket()
    746     s.bind(("::1", 0xace))
    747     s.connect(("::1", 0xbeef))
    748     self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
    749     self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
    750 
    751   def testProcNetIcmp(self):
    752     s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
    753     s.bind(("127.0.0.1", 0xace))
    754     s.connect(("127.0.0.1", 0xbeef))
    755     self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
    756 
    757   @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
    758   def testProcNetIcmp6(self):
    759     numrows6 = len(self.ReadProcNetSocket("icmp6"))
    760     s = net_test.IPv6PingSocket()
    761     s.bind(("::1", 0xace))
    762     s.connect(("::1", 0xbeef))
    763     self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
    764 
    765     # Check the row goes away when the socket is closed.
    766     s.close()
    767     self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
    768 
    769     # Try send, bind and connect to check the addresses and the state.
    770     s = net_test.IPv6PingSocket()
    771     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
    772     s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
    773     self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
    774 
    775     # Can't bind after sendto, apparently.
    776     s = net_test.IPv6PingSocket()
    777     self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
    778     s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
    779     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
    780 
    781     # Check receive bytes.
    782     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_MULTICAST_IF, self.ifindex)
    783     s.connect(("ff02::1", 0xdead))
    784     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
    785     s.send(net_test.IPV6_PING)
    786     s.recvfrom(32768, MSG_PEEK)  # Wait until the receive thread replies.
    787     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
    788                            txmem=0, rxmem=0x300)
    789     self.assertValidPingResponse(s, net_test.IPV6_PING)
    790     self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
    791                            txmem=0, rxmem=0)
    792 
    793   def testProcNetUdp6(self):
    794     s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
    795     s.bind(("::1", 0xace))
    796     s.connect(("::1", 0xbeef))
    797     self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
    798 
    799   def testProcNetRaw6(self):
    800     s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
    801     s.bind(("::1", 0xace))
    802     s.connect(("::1", 0xbeef))
    803     self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
    804 
    805   def testIPv6MTU(self):
    806     """Tests IPV6_RECVERR and path MTU discovery on ping sockets.
    807 
    808     Relevant kernel commits:
    809       upstream net-next:
    810         dcb94b8 ipv6: fix endianness error in icmpv6_err
    811     """
    812     s = net_test.IPv6PingSocket()
    813     s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_DONTFRAG, 1)
    814     s.setsockopt(net_test.SOL_IPV6, csocket.IPV6_MTU_DISCOVER, 2)
    815     s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
    816     s.connect((net_test.IPV6_ADDR, 55))
    817     pkt = net_test.IPV6_PING + (PingReplyThread.LINK_MTU + 100) * "a"
    818     s.send(pkt)
    819     self.assertRaisesErrno(errno.EMSGSIZE, s.recv, 32768)
    820     data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, csocket.MSG_ERRQUEUE)
    821 
    822     # Compare the offending packet with the one we sent. To do this we need to
    823     # calculate the ident of the packet we sent and blank out the checksum of
    824     # the one we received.
    825     ident = struct.pack("!H", s.getsockname()[1])
    826     pkt = pkt[:4] + ident + pkt[6:]
    827     data = data[:2] + "\x00\x00" + pkt[4:]
    828     self.assertEquals(pkt, data)
    829 
    830     # Check the address that the packet was sent to.
    831     # ... except in 4.1, where it just returns an AF_UNSPEC, like this:
    832     # recvmsg(9, {msg_name(0)={sa_family=AF_UNSPEC,
    833     #     sa_data="\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"},
    834     #     msg_iov(1)=[{"\x80\x00\x04\x6b\x00\xc4\x00\x03\x61\x61\x61\x61\x61\x61"..., 4096}],
    835     #     msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...},
    836     #     msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232
    837     if net_test.LINUX_VERSION != (4, 1, 0):
    838       self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
    839 
    840     # Check the cmsg data, including the link MTU.
    841     mtu = PingReplyThread.LINK_MTU
    842     src = self.reply_threads[self.netid].INTERMEDIATE_IPV6
    843     msglist = [
    844         (net_test.SOL_IPV6, net_test.IPV6_RECVERR,
    845          (csocket.SockExtendedErr((errno.EMSGSIZE, csocket.SO_ORIGIN_ICMP6,
    846                                    ICMPV6_PKT_TOOBIG, 0, mtu, 0)),
    847           csocket.Sockaddr((src, 0))))
    848     ]
    849 
    850     # IP[V6]_RECVERR in 3.10 appears to return incorrect data for the port.
    851     # The fix might have been in 676d236, but we don't have that in 3.10 and it
    852     # touches code all over the tree. Instead, just don't check the port.
    853     if net_test.LINUX_VERSION <= (3, 14, 0):
    854       msglist[0][2][1].port = cmsg[0][2][1].port
    855 
    856     self.assertEquals(msglist, cmsg)
    857 
    858 
    859 if __name__ == "__main__":
    860   unittest.main()
    861