Home | History | Annotate | Download | only in test
      1 #!/usr/bin/python
      2 #
      3 # Copyright 2017 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 # pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
     18 from errno import *  # pylint: disable=wildcard-import
     19 from socket import *  # pylint: disable=wildcard-import
     20 
     21 import random
     22 import struct
     23 import unittest
     24 
     25 from tun_twister import TunTwister
     26 import csocket
     27 import iproute
     28 import multinetwork_base
     29 import net_test
     30 import packets
     31 import xfrm
     32 import xfrm_base
     33 
     34 # Parameters to Set up VTI as a special network
     35 _BASE_VTI_NETID = {4: 40, 6: 60}
     36 _BASE_VTI_OKEY = 2000000100
     37 _BASE_VTI_IKEY = 2000000200
     38 
     39 _VTI_NETID = 50
     40 _VTI_IFNAME = "test_vti"
     41 
     42 _TEST_OUT_SPI = 0x1234
     43 _TEST_IN_SPI = _TEST_OUT_SPI
     44 
     45 _TEST_OKEY = 2000000100
     46 _TEST_IKEY = 2000000200
     47 
     48 
     49 def _GetLocalInnerAddress(version):
     50   return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
     51 
     52 
     53 def _GetRemoteInnerAddress(version):
     54   return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version]
     55 
     56 
     57 def _GetRemoteOuterAddress(version):
     58   return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
     59 
     60 
     61 class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
     62 
     63   def _CheckTunnelOutput(self, inner_version, outer_version):
     64     """Test a bi-directional XFRM Tunnel with explicit selectors"""
     65     # Select the underlying netid, which represents the external
     66     # interface from/to which to route ESP packets.
     67     underlying_netid = self.RandomNetid()
     68     # Select a random netid that will originate traffic locally and
     69     # which represents the logical tunnel network.
     70     netid = self.RandomNetid(exclude=underlying_netid)
     71 
     72     local_inner = self.MyAddress(inner_version, netid)
     73     remote_inner = _GetRemoteInnerAddress(inner_version)
     74     local_outer = self.MyAddress(outer_version, underlying_netid)
     75     remote_outer = _GetRemoteOuterAddress(outer_version)
     76 
     77     self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT,
     78                            xfrm.SrcDstSelector(local_inner, remote_inner),
     79                            local_outer, remote_outer, _TEST_OUT_SPI,
     80                            xfrm_base._ALGO_CBC_AES_256,
     81                            xfrm_base._ALGO_HMAC_SHA1,
     82                            None, underlying_netid)
     83 
     84     write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
     85     # Select an interface, which provides the source address of the inner
     86     # packet.
     87     self.SelectInterface(write_sock, netid, "mark")
     88     write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
     89     self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
     90                             local_outer, remote_outer)
     91 
     92   # TODO: Add support for the input path.
     93 
     94   def testIpv4InIpv4TunnelOutput(self):
     95     self._CheckTunnelOutput(4, 4)
     96 
     97   def testIpv4InIpv6TunnelOutput(self):
     98     self._CheckTunnelOutput(4, 6)
     99 
    100   def testIpv6InIpv4TunnelOutput(self):
    101     self._CheckTunnelOutput(6, 4)
    102 
    103   def testIpv6InIpv6TunnelOutput(self):
    104     self._CheckTunnelOutput(6, 6)
    105 
    106 
    107 @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
    108 class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
    109   def verifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, ikey, okey):
    110     self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey)
    111     self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey)
    112 
    113     family = AF_INET if version == 4 else AF_INET6
    114     self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), local_addr)
    115     self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), remote_addr)
    116 
    117   def testAddVti(self):
    118     """Test the creation of a Virtual Tunnel Interface."""
    119     for version in [4, 6]:
    120       netid = self.RandomNetid()
    121       local_addr = self.MyAddress(version, netid)
    122       self.iproute.CreateVirtualTunnelInterface(
    123           dev_name=_VTI_IFNAME,
    124           local_addr=local_addr,
    125           remote_addr=_GetRemoteOuterAddress(version),
    126           o_key=_TEST_OKEY,
    127           i_key=_TEST_IKEY)
    128       self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME),
    129                              version, local_addr, _GetRemoteOuterAddress(version),
    130                              _TEST_IKEY, _TEST_OKEY)
    131 
    132       new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
    133       new_okey = _TEST_OKEY + _VTI_NETID
    134       new_ikey = _TEST_IKEY + _VTI_NETID
    135       self.iproute.CreateVirtualTunnelInterface(
    136           dev_name=_VTI_IFNAME,
    137           local_addr=local_addr,
    138           remote_addr=new_remote_addr[version],
    139           o_key=new_okey,
    140           i_key=new_ikey,
    141           is_update=True)
    142 
    143       self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME),
    144                              version, local_addr, new_remote_addr[version],
    145                              new_ikey, new_okey)
    146 
    147       if_index = self.iproute.GetIfIndex(_VTI_IFNAME)
    148 
    149       # Validate that the netlink interface matches the ioctl interface.
    150       self.assertEquals(net_test.GetInterfaceIndex(_VTI_IFNAME), if_index)
    151       self.iproute.DeleteLink(_VTI_IFNAME)
    152       with self.assertRaises(IOError):
    153         self.iproute.GetIfIndex(_VTI_IFNAME)
    154 
    155   def _QuietDeleteLink(self, ifname):
    156     try:
    157       self.iproute.DeleteLink(ifname)
    158     except IOError:
    159       # The link was not present.
    160       pass
    161 
    162   def tearDown(self):
    163     super(XfrmAddDeleteVtiTest, self).tearDown()
    164     self._QuietDeleteLink(_VTI_IFNAME)
    165 
    166 
    167 class VtiInterface(object):
    168 
    169   def __init__(self, iface, netid, underlying_netid, local, remote):
    170     self.iface = iface
    171     self.netid = netid
    172     self.underlying_netid = underlying_netid
    173     self.local, self.remote = local, remote
    174     self.rx = self.tx = 0
    175     self.ikey = _TEST_IKEY + netid
    176     self.okey = _TEST_OKEY + netid
    177     self.out_spi = self.in_spi = random.randint(0, 0x7fffffff)
    178 
    179     self.iproute = iproute.IPRoute()
    180     self.xfrm = xfrm.Xfrm()
    181 
    182     self.SetupInterface()
    183     self.SetupXfrm()
    184     self.addrs = {}
    185 
    186   def Teardown(self):
    187     self.TeardownXfrm()
    188     self.TeardownInterface()
    189 
    190   def SetupInterface(self):
    191     self.iproute.CreateVirtualTunnelInterface(
    192         self.iface, self.local, self.remote, self.ikey, self.okey)
    193 
    194   def TeardownInterface(self):
    195     self.iproute.DeleteLink(self.iface)
    196 
    197   def SetupXfrm(self):
    198     # For the VTI, the selectors are wildcard since packets will only
    199     # be selected if they have the appropriate mark, hence the inner
    200     # addresses are wildcard.
    201     self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
    202                            self.out_spi, xfrm_base._ALGO_CBC_AES_256,
    203                            xfrm_base._ALGO_HMAC_SHA1,
    204                            xfrm.ExactMatchMark(self.okey),
    205                            self.underlying_netid)
    206 
    207     self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
    208                            self.in_spi, xfrm_base._ALGO_CBC_AES_256,
    209                            xfrm_base._ALGO_HMAC_SHA1,
    210                            xfrm.ExactMatchMark(self.ikey), None)
    211 
    212   def TeardownXfrm(self):
    213     self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
    214                            self.out_spi, self.okey)
    215     self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
    216                            self.in_spi, self.ikey)
    217 
    218 
    219 @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
    220 class XfrmVtiTest(xfrm_base.XfrmBaseTest):
    221 
    222   @classmethod
    223   def setUpClass(cls):
    224     xfrm_base.XfrmBaseTest.setUpClass()
    225     # VTI interfaces use marks extensively, so configure realistic packet
    226     # marking rules to make the test representative, make PMTUD work, etc.
    227     cls.SetInboundMarks(True)
    228     cls.SetMarkReflectSysctls(1)
    229 
    230     cls.vtis = {}
    231     for i, underlying_netid in enumerate(cls.tuns):
    232       for version in 4, 6:
    233         netid = _BASE_VTI_NETID[version] + i
    234         iface = "ipsec%s" % netid
    235         local = cls.MyAddress(version, underlying_netid)
    236         if version == 4:
    237           remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR
    238         else:
    239           remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR
    240         vti = VtiInterface(iface, netid, underlying_netid, local, remote)
    241         cls._SetInboundMarking(netid, iface, True)
    242         cls._SetupVtiNetwork(vti, True)
    243         cls.vtis[netid] = vti
    244 
    245   @classmethod
    246   def tearDownClass(cls):
    247     # The sysctls are restored by MultinetworkBaseTest.tearDownClass.
    248     cls.SetInboundMarks(False)
    249     for vti in cls.vtis.values():
    250       cls._SetInboundMarking(vti.netid, vti.iface, False)
    251       cls._SetupVtiNetwork(vti, False)
    252       vti.Teardown()
    253     xfrm_base.XfrmBaseTest.tearDownClass()
    254 
    255   def setUp(self):
    256     multinetwork_base.MultiNetworkBaseTest.setUp(self)
    257     self.iproute = iproute.IPRoute()
    258 
    259   def tearDown(self):
    260     multinetwork_base.MultiNetworkBaseTest.tearDown(self)
    261 
    262   def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
    263     """Exchange two addresses on a given interface.
    264 
    265     Args:
    266       ifname: Name of the interface
    267       old_addr: An address to be removed from the interface
    268       new_addr: An address to be added to an interface
    269     """
    270     version = 6 if ":" in new_addr else 4
    271     ifindex = net_test.GetInterfaceIndex(ifname)
    272     self.iproute.AddAddress(new_addr,
    273                             net_test.AddressLengthBits(version), ifindex)
    274     self.iproute.DelAddress(old_addr,
    275                             net_test.AddressLengthBits(version), ifindex)
    276 
    277   @classmethod
    278   def _SetupVtiNetwork(cls, vti, is_add):
    279     """Setup rules and routes for a VTI Network.
    280 
    281     Takes an interface and depending on the boolean
    282     value of is_add, either adds or removes the rules
    283     and routes for a VTI to behave like an Android
    284     Network for purposes of testing.
    285 
    286     Args:
    287       vti: A VtiInterface, the VTI to set up.
    288       is_add: Boolean that causes this method to perform setup if True or
    289         teardown if False
    290     """
    291     if is_add:
    292       # Disable router solicitations to avoid occasional spurious packets
    293       # arriving on the underlying network; there are two possible behaviors
    294       # when that occurred: either only the RA packet is read, and when it
    295       # is echoed back to the VTI, it causes the test to fail by not receiving
    296       # the UDP_PAYLOAD; or, two packets may arrive on the underlying
    297       # network which fails the assertion that only one ESP packet is received.
    298       cls.SetSysctl(
    299           "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0)
    300       net_test.SetInterfaceUp(vti.iface)
    301 
    302     for version in [4, 6]:
    303       ifindex = net_test.GetInterfaceIndex(vti.iface)
    304       table = vti.netid
    305 
    306       # Set up routing rules.
    307       start, end = cls.UidRangeForNetid(vti.netid)
    308       cls.iproute.UidRangeRule(version, is_add, start, end, table,
    309                                 cls.PRIORITY_UID)
    310       cls.iproute.OifRule(version, is_add, vti.iface, table, cls.PRIORITY_OIF)
    311       cls.iproute.FwmarkRule(version, is_add, vti.netid, cls.NETID_FWMASK,
    312                               table, cls.PRIORITY_FWMARK)
    313 
    314       # Configure IP addresses.
    315       if version == 4:
    316         addr = cls._MyIPv4Address(vti.netid)
    317       else:
    318         addr = cls.OnlinkPrefix(6, vti.netid) + "1"
    319       prefixlen = net_test.AddressLengthBits(version)
    320       vti.addrs[version] = addr
    321       if is_add:
    322         cls.iproute.AddAddress(addr, prefixlen, ifindex)
    323         cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
    324       else:
    325         cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
    326         cls.iproute.DelAddress(addr, prefixlen, ifindex)
    327 
    328   def assertReceivedPacket(self, vti):
    329     vti.rx += 1
    330     self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
    331 
    332   def assertSentPacket(self, vti):
    333     vti.tx += 1
    334     self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
    335 
    336   # TODO: Should we completely re-write this using null encryption and null
    337   # authentication? We could then assemble and disassemble packets for each
    338   # direction individually. This approach would improve debuggability, avoid the
    339   # complexity of the twister, and allow the test to more-closely validate
    340   # deployable configurations.
    341   def _CheckVtiInputOutput(self, vti, inner_version):
    342     local_outer = vti.local
    343     remote_outer = vti.remote
    344 
    345     # Create a socket to receive packets.
    346     read_sock = socket(
    347         net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
    348     read_sock.bind((net_test.GetWildcardAddress(inner_version), 0))
    349     # The second parameter of the tuple is the port number regardless of AF.
    350     port = read_sock.getsockname()[1]
    351     # Guard against the eventuality of the receive failing.
    352     csocket.SetSocketTimeout(read_sock, 100)
    353 
    354     # Send a packet out via the vti-backed network, bound for the port number
    355     # of the input socket.
    356     write_sock = socket(
    357         net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
    358     self.SelectInterface(write_sock, vti.netid, "mark")
    359     write_sock.sendto(net_test.UDP_PAYLOAD,
    360                       (_GetRemoteInnerAddress(inner_version), port))
    361 
    362     # Read a tunneled IP packet on the underlying (outbound) network
    363     # verifying that it is an ESP packet.
    364     self.assertSentPacket(vti)
    365     pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
    366                                   local_outer, remote_outer)
    367 
    368     # Perform an address switcheroo so that the inner address of the remote
    369     # end of the tunnel is now the address on the local VTI interface; this
    370     # way, the twisted inner packet finds a destination via the VTI once
    371     # decrypted.
    372     remote = _GetRemoteInnerAddress(inner_version)
    373     local = vti.addrs[inner_version]
    374     self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local)
    375     try:
    376       # Swap the packet's IP headers and write it back to the
    377       # underlying network.
    378       pkt = TunTwister.TwistPacket(pkt)
    379       self.ReceivePacketOn(vti.underlying_netid, pkt)
    380       self.assertReceivedPacket(vti)
    381       # Receive the decrypted packet on the dest port number.
    382       read_packet = read_sock.recv(4096)
    383       self.assertEquals(read_packet, net_test.UDP_PAYLOAD)
    384     finally:
    385       # Unwind the switcheroo
    386       self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote)
    387 
    388     # Now attempt to provoke an ICMP error.
    389     # TODO: deduplicate with multinetwork_test.py.
    390     version = net_test.GetAddressVersion(vti.remote)
    391     dst_prefix, intermediate = {
    392         4: ("172.19.", "172.16.9.12"),
    393         6: ("2001:db8::", "2001:db8::1")
    394     }[version]
    395 
    396     write_sock.sendto(net_test.UDP_PAYLOAD,
    397                       (_GetRemoteInnerAddress(inner_version), port))
    398     self.assertSentPacket(vti)
    399     pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
    400                                   local_outer, remote_outer)
    401     myaddr = self.MyAddress(version, vti.underlying_netid)
    402     _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt)
    403     self.ReceivePacketOn(vti.underlying_netid, toobig)
    404 
    405     # Check that the packet too big reduced the MTU.
    406     routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None)
    407     self.assertEquals(1, len(routes))
    408     rtmsg, attributes = routes[0]
    409     self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
    410     self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
    411 
    412     # Clear PMTU information so that future tests don't have to worry about it.
    413     self.InvalidateDstCache(version, vti.underlying_netid)
    414 
    415   def testVtiInputOutput(self):
    416     """Test packet input and output over a Virtual Tunnel Interface."""
    417     for i in xrange(3 * len(self.vtis.values())):
    418       vti = random.choice(self.vtis.values())
    419       self._CheckVtiInputOutput(vti, 4)
    420       self._CheckVtiInputOutput(vti, 6)
    421 
    422 
    423 if __name__ == "__main__":
    424   unittest.main()
    425