Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2017 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 import errno
     18 import fcntl
     19 from socket import *  # pylint: disable=wildcard-import
     20 import unittest
     21 
     22 import csocket
     23 import cstruct
     24 import net_test
     25 
     26 IPV4_LOOPBACK_ADDR = "127.0.0.1"
     27 IPV6_LOOPBACK_ADDR = "::1"
     28 LOOPBACK_DEV = "lo"
     29 LOOPBACK_IFINDEX = 1
     30 
     31 SIOCKILLADDR = 0x8939
     32 
     33 
     34 Ifreq = cstruct.Struct("Ifreq", "=16s16s", "name data")
     35 In6Ifreq = cstruct.Struct("In6Ifreq", "=16sIi", "addr prefixlen ifindex")
     36 
     37 def KillAddrIoctl(addr):
     38   """Calls the SIOCKILLADDR ioctl on the provided IP address.
     39 
     40   Args:
     41     addr The IP address to pass to the ioctl.
     42 
     43   Raises:
     44     ValueError: If addr is of an unsupported address family.
     45   """
     46   family, _, _, _, _ = getaddrinfo(addr, None, AF_UNSPEC, SOCK_DGRAM, 0,
     47                                    AI_NUMERICHOST)[0]
     48   if family == AF_INET6:
     49     addr = inet_pton(AF_INET6, addr)
     50     ifreq = In6Ifreq((addr, 128, LOOPBACK_IFINDEX)).Pack()
     51   elif family == AF_INET:
     52     addr = inet_pton(AF_INET, addr)
     53     sockaddr = csocket.SockaddrIn((AF_INET, 0, addr)).Pack()
     54     ifreq = Ifreq((LOOPBACK_DEV, sockaddr)).Pack()
     55   else:
     56     raise ValueError('Address family %r not supported.' % family)
     57   datagram_socket = socket(family, SOCK_DGRAM)
     58   try:
     59     fcntl.ioctl(datagram_socket.fileno(), SIOCKILLADDR, ifreq)
     60   finally:
     61     datagram_socket.close()
     62 
     63 # For convenience.
     64 def CreateIPv4SocketPair():
     65   return net_test.CreateSocketPair(AF_INET, SOCK_STREAM, IPV4_LOOPBACK_ADDR)
     66 
     67 def CreateIPv6SocketPair():
     68   return net_test.CreateSocketPair(AF_INET6, SOCK_STREAM, IPV6_LOOPBACK_ADDR)
     69 
     70 
     71 @unittest.skipUnless(net_test.LINUX_VERSION >= (4, 4, 0), "grace period")
     72 class TcpNukeAddrTest(net_test.NetworkTest):
     73 
     74   """Tests that SIOCKILLADDR no longer exists.
     75 
     76   The out-of-tree SIOCKILLADDR was replaced by the upstream SOCK_DESTROY
     77   operation in Linux 4.5. It was backported to common Android trees all the way
     78   back to android-3.10 and is required by CTS from Android N onwards.
     79   This test ensures that it no longer works in 4.4 and above kernels.
     80 
     81   Relevant kernel commits:
     82     android-4.4:
     83       3094efd84c Revert "net: socket ioctl to reset connections matching local address"
     84   """
     85 
     86   def CheckNukeAddrUnsupported(self, socketpair, addr):
     87     s1, s2 = socketpair
     88     self.assertRaisesErrno(errno.ENOTTY, KillAddrIoctl, addr)
     89     data = "foo"
     90     try:
     91       self.assertEquals(len(data), s1.send(data))
     92       self.assertEquals(data, s2.recv(4096))
     93       self.assertSocketsNotClosed(socketpair)
     94     finally:
     95       s1.close()
     96       s2.close()
     97 
     98   def assertSocketsNotClosed(self, socketpair):
     99     for sock in socketpair:
    100       self.assertTrue(sock.getpeername())
    101 
    102   def testIpv4Unsupported(self):
    103     self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), IPV4_LOOPBACK_ADDR)
    104     self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), "0.0.0.0")
    105 
    106   def testIpv6Unsupported(self):
    107     self.CheckNukeAddrUnsupported(CreateIPv6SocketPair(), IPV6_LOOPBACK_ADDR)
    108     self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), "::")
    109 
    110 
    111 if __name__ == "__main__":
    112   unittest.main()
    113