1 #!/usr/bin/python 2 # 3 # Copyright 2017 The Android Open Source Project 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); 6 # you may not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 17 # pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import 18 from errno import * # pylint: disable=wildcard-import 19 from socket import * # pylint: disable=wildcard-import 20 21 import random 22 import struct 23 import unittest 24 25 from tun_twister import TunTwister 26 import csocket 27 import iproute 28 import multinetwork_base 29 import net_test 30 import packets 31 import xfrm 32 import xfrm_base 33 34 # Parameters to Set up VTI as a special network 35 _BASE_VTI_NETID = {4: 40, 6: 60} 36 _BASE_VTI_OKEY = 2000000100 37 _BASE_VTI_IKEY = 2000000200 38 39 _VTI_NETID = 50 40 _VTI_IFNAME = "test_vti" 41 42 _TEST_OUT_SPI = 0x1234 43 _TEST_IN_SPI = _TEST_OUT_SPI 44 45 _TEST_OKEY = 2000000100 46 _TEST_IKEY = 2000000200 47 48 49 def _GetLocalInnerAddress(version): 50 return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] 51 52 53 def _GetRemoteInnerAddress(version): 54 return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version] 55 56 57 def _GetRemoteOuterAddress(version): 58 return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] 59 60 61 class XfrmTunnelTest(xfrm_base.XfrmLazyTest): 62 63 def _CheckTunnelOutput(self, inner_version, outer_version): 64 """Test a bi-directional XFRM Tunnel with explicit selectors""" 65 # Select the underlying netid, which represents the external 66 # interface from/to which to route ESP packets. 67 underlying_netid = self.RandomNetid() 68 # Select a random netid that will originate traffic locally and 69 # which represents the logical tunnel network. 70 netid = self.RandomNetid(exclude=underlying_netid) 71 72 local_inner = self.MyAddress(inner_version, netid) 73 remote_inner = _GetRemoteInnerAddress(inner_version) 74 local_outer = self.MyAddress(outer_version, underlying_netid) 75 remote_outer = _GetRemoteOuterAddress(outer_version) 76 77 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, 78 xfrm.SrcDstSelector(local_inner, remote_inner), 79 local_outer, remote_outer, _TEST_OUT_SPI, 80 xfrm_base._ALGO_CBC_AES_256, 81 xfrm_base._ALGO_HMAC_SHA1, 82 None, underlying_netid) 83 84 write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) 85 # Select an interface, which provides the source address of the inner 86 # packet. 87 self.SelectInterface(write_sock, netid, "mark") 88 write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) 89 self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, 90 local_outer, remote_outer) 91 92 # TODO: Add support for the input path. 93 94 def testIpv4InIpv4TunnelOutput(self): 95 self._CheckTunnelOutput(4, 4) 96 97 def testIpv4InIpv6TunnelOutput(self): 98 self._CheckTunnelOutput(4, 6) 99 100 def testIpv6InIpv4TunnelOutput(self): 101 self._CheckTunnelOutput(6, 4) 102 103 def testIpv6InIpv6TunnelOutput(self): 104 self._CheckTunnelOutput(6, 6) 105 106 107 @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") 108 class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): 109 def verifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, ikey, okey): 110 self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey) 111 self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey) 112 113 family = AF_INET if version == 4 else AF_INET6 114 self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), local_addr) 115 self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), remote_addr) 116 117 def testAddVti(self): 118 """Test the creation of a Virtual Tunnel Interface.""" 119 for version in [4, 6]: 120 netid = self.RandomNetid() 121 local_addr = self.MyAddress(version, netid) 122 self.iproute.CreateVirtualTunnelInterface( 123 dev_name=_VTI_IFNAME, 124 local_addr=local_addr, 125 remote_addr=_GetRemoteOuterAddress(version), 126 o_key=_TEST_OKEY, 127 i_key=_TEST_IKEY) 128 self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME), 129 version, local_addr, _GetRemoteOuterAddress(version), 130 _TEST_IKEY, _TEST_OKEY) 131 132 new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} 133 new_okey = _TEST_OKEY + _VTI_NETID 134 new_ikey = _TEST_IKEY + _VTI_NETID 135 self.iproute.CreateVirtualTunnelInterface( 136 dev_name=_VTI_IFNAME, 137 local_addr=local_addr, 138 remote_addr=new_remote_addr[version], 139 o_key=new_okey, 140 i_key=new_ikey, 141 is_update=True) 142 143 self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME), 144 version, local_addr, new_remote_addr[version], 145 new_ikey, new_okey) 146 147 if_index = self.iproute.GetIfIndex(_VTI_IFNAME) 148 149 # Validate that the netlink interface matches the ioctl interface. 150 self.assertEquals(net_test.GetInterfaceIndex(_VTI_IFNAME), if_index) 151 self.iproute.DeleteLink(_VTI_IFNAME) 152 with self.assertRaises(IOError): 153 self.iproute.GetIfIndex(_VTI_IFNAME) 154 155 def _QuietDeleteLink(self, ifname): 156 try: 157 self.iproute.DeleteLink(ifname) 158 except IOError: 159 # The link was not present. 160 pass 161 162 def tearDown(self): 163 super(XfrmAddDeleteVtiTest, self).tearDown() 164 self._QuietDeleteLink(_VTI_IFNAME) 165 166 167 class VtiInterface(object): 168 169 def __init__(self, iface, netid, underlying_netid, local, remote): 170 self.iface = iface 171 self.netid = netid 172 self.underlying_netid = underlying_netid 173 self.local, self.remote = local, remote 174 self.rx = self.tx = 0 175 self.ikey = _TEST_IKEY + netid 176 self.okey = _TEST_OKEY + netid 177 self.out_spi = self.in_spi = random.randint(0, 0x7fffffff) 178 179 self.iproute = iproute.IPRoute() 180 self.xfrm = xfrm.Xfrm() 181 182 self.SetupInterface() 183 self.SetupXfrm() 184 self.addrs = {} 185 186 def Teardown(self): 187 self.TeardownXfrm() 188 self.TeardownInterface() 189 190 def SetupInterface(self): 191 self.iproute.CreateVirtualTunnelInterface( 192 self.iface, self.local, self.remote, self.ikey, self.okey) 193 194 def TeardownInterface(self): 195 self.iproute.DeleteLink(self.iface) 196 197 def SetupXfrm(self): 198 # For the VTI, the selectors are wildcard since packets will only 199 # be selected if they have the appropriate mark, hence the inner 200 # addresses are wildcard. 201 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, 202 self.out_spi, xfrm_base._ALGO_CBC_AES_256, 203 xfrm_base._ALGO_HMAC_SHA1, 204 xfrm.ExactMatchMark(self.okey), 205 self.underlying_netid) 206 207 self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, 208 self.in_spi, xfrm_base._ALGO_CBC_AES_256, 209 xfrm_base._ALGO_HMAC_SHA1, 210 xfrm.ExactMatchMark(self.ikey), None) 211 212 def TeardownXfrm(self): 213 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, 214 self.out_spi, self.okey) 215 self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, 216 self.in_spi, self.ikey) 217 218 219 @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") 220 class XfrmVtiTest(xfrm_base.XfrmBaseTest): 221 222 @classmethod 223 def setUpClass(cls): 224 xfrm_base.XfrmBaseTest.setUpClass() 225 # VTI interfaces use marks extensively, so configure realistic packet 226 # marking rules to make the test representative, make PMTUD work, etc. 227 cls.SetInboundMarks(True) 228 cls.SetMarkReflectSysctls(1) 229 230 cls.vtis = {} 231 for i, underlying_netid in enumerate(cls.tuns): 232 for version in 4, 6: 233 netid = _BASE_VTI_NETID[version] + i 234 iface = "ipsec%s" % netid 235 local = cls.MyAddress(version, underlying_netid) 236 if version == 4: 237 remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR 238 else: 239 remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR 240 vti = VtiInterface(iface, netid, underlying_netid, local, remote) 241 cls._SetInboundMarking(netid, iface, True) 242 cls._SetupVtiNetwork(vti, True) 243 cls.vtis[netid] = vti 244 245 @classmethod 246 def tearDownClass(cls): 247 # The sysctls are restored by MultinetworkBaseTest.tearDownClass. 248 cls.SetInboundMarks(False) 249 for vti in cls.vtis.values(): 250 cls._SetInboundMarking(vti.netid, vti.iface, False) 251 cls._SetupVtiNetwork(vti, False) 252 vti.Teardown() 253 xfrm_base.XfrmBaseTest.tearDownClass() 254 255 def setUp(self): 256 multinetwork_base.MultiNetworkBaseTest.setUp(self) 257 self.iproute = iproute.IPRoute() 258 259 def tearDown(self): 260 multinetwork_base.MultiNetworkBaseTest.tearDown(self) 261 262 def _SwapInterfaceAddress(self, ifname, old_addr, new_addr): 263 """Exchange two addresses on a given interface. 264 265 Args: 266 ifname: Name of the interface 267 old_addr: An address to be removed from the interface 268 new_addr: An address to be added to an interface 269 """ 270 version = 6 if ":" in new_addr else 4 271 ifindex = net_test.GetInterfaceIndex(ifname) 272 self.iproute.AddAddress(new_addr, 273 net_test.AddressLengthBits(version), ifindex) 274 self.iproute.DelAddress(old_addr, 275 net_test.AddressLengthBits(version), ifindex) 276 277 @classmethod 278 def _SetupVtiNetwork(cls, vti, is_add): 279 """Setup rules and routes for a VTI Network. 280 281 Takes an interface and depending on the boolean 282 value of is_add, either adds or removes the rules 283 and routes for a VTI to behave like an Android 284 Network for purposes of testing. 285 286 Args: 287 vti: A VtiInterface, the VTI to set up. 288 is_add: Boolean that causes this method to perform setup if True or 289 teardown if False 290 """ 291 if is_add: 292 # Disable router solicitations to avoid occasional spurious packets 293 # arriving on the underlying network; there are two possible behaviors 294 # when that occurred: either only the RA packet is read, and when it 295 # is echoed back to the VTI, it causes the test to fail by not receiving 296 # the UDP_PAYLOAD; or, two packets may arrive on the underlying 297 # network which fails the assertion that only one ESP packet is received. 298 cls.SetSysctl( 299 "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0) 300 net_test.SetInterfaceUp(vti.iface) 301 302 for version in [4, 6]: 303 ifindex = net_test.GetInterfaceIndex(vti.iface) 304 table = vti.netid 305 306 # Set up routing rules. 307 start, end = cls.UidRangeForNetid(vti.netid) 308 cls.iproute.UidRangeRule(version, is_add, start, end, table, 309 cls.PRIORITY_UID) 310 cls.iproute.OifRule(version, is_add, vti.iface, table, cls.PRIORITY_OIF) 311 cls.iproute.FwmarkRule(version, is_add, vti.netid, cls.NETID_FWMASK, 312 table, cls.PRIORITY_FWMARK) 313 314 # Configure IP addresses. 315 if version == 4: 316 addr = cls._MyIPv4Address(vti.netid) 317 else: 318 addr = cls.OnlinkPrefix(6, vti.netid) + "1" 319 prefixlen = net_test.AddressLengthBits(version) 320 vti.addrs[version] = addr 321 if is_add: 322 cls.iproute.AddAddress(addr, prefixlen, ifindex) 323 cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) 324 else: 325 cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) 326 cls.iproute.DelAddress(addr, prefixlen, ifindex) 327 328 def assertReceivedPacket(self, vti): 329 vti.rx += 1 330 self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) 331 332 def assertSentPacket(self, vti): 333 vti.tx += 1 334 self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) 335 336 # TODO: Should we completely re-write this using null encryption and null 337 # authentication? We could then assemble and disassemble packets for each 338 # direction individually. This approach would improve debuggability, avoid the 339 # complexity of the twister, and allow the test to more-closely validate 340 # deployable configurations. 341 def _CheckVtiInputOutput(self, vti, inner_version): 342 local_outer = vti.local 343 remote_outer = vti.remote 344 345 # Create a socket to receive packets. 346 read_sock = socket( 347 net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) 348 read_sock.bind((net_test.GetWildcardAddress(inner_version), 0)) 349 # The second parameter of the tuple is the port number regardless of AF. 350 port = read_sock.getsockname()[1] 351 # Guard against the eventuality of the receive failing. 352 csocket.SetSocketTimeout(read_sock, 100) 353 354 # Send a packet out via the vti-backed network, bound for the port number 355 # of the input socket. 356 write_sock = socket( 357 net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) 358 self.SelectInterface(write_sock, vti.netid, "mark") 359 write_sock.sendto(net_test.UDP_PAYLOAD, 360 (_GetRemoteInnerAddress(inner_version), port)) 361 362 # Read a tunneled IP packet on the underlying (outbound) network 363 # verifying that it is an ESP packet. 364 self.assertSentPacket(vti) 365 pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, 366 local_outer, remote_outer) 367 368 # Perform an address switcheroo so that the inner address of the remote 369 # end of the tunnel is now the address on the local VTI interface; this 370 # way, the twisted inner packet finds a destination via the VTI once 371 # decrypted. 372 remote = _GetRemoteInnerAddress(inner_version) 373 local = vti.addrs[inner_version] 374 self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local) 375 try: 376 # Swap the packet's IP headers and write it back to the 377 # underlying network. 378 pkt = TunTwister.TwistPacket(pkt) 379 self.ReceivePacketOn(vti.underlying_netid, pkt) 380 self.assertReceivedPacket(vti) 381 # Receive the decrypted packet on the dest port number. 382 read_packet = read_sock.recv(4096) 383 self.assertEquals(read_packet, net_test.UDP_PAYLOAD) 384 finally: 385 # Unwind the switcheroo 386 self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote) 387 388 # Now attempt to provoke an ICMP error. 389 # TODO: deduplicate with multinetwork_test.py. 390 version = net_test.GetAddressVersion(vti.remote) 391 dst_prefix, intermediate = { 392 4: ("172.19.", "172.16.9.12"), 393 6: ("2001:db8::", "2001:db8::1") 394 }[version] 395 396 write_sock.sendto(net_test.UDP_PAYLOAD, 397 (_GetRemoteInnerAddress(inner_version), port)) 398 self.assertSentPacket(vti) 399 pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, 400 local_outer, remote_outer) 401 myaddr = self.MyAddress(version, vti.underlying_netid) 402 _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt) 403 self.ReceivePacketOn(vti.underlying_netid, toobig) 404 405 # Check that the packet too big reduced the MTU. 406 routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None) 407 self.assertEquals(1, len(routes)) 408 rtmsg, attributes = routes[0] 409 self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) 410 self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) 411 412 # Clear PMTU information so that future tests don't have to worry about it. 413 self.InvalidateDstCache(version, vti.underlying_netid) 414 415 def testVtiInputOutput(self): 416 """Test packet input and output over a Virtual Tunnel Interface.""" 417 for i in xrange(3 * len(self.vtis.values())): 418 vti = random.choice(self.vtis.values()) 419 self._CheckVtiInputOutput(vti, 4) 420 self._CheckVtiInputOutput(vti, 6) 421 422 423 if __name__ == "__main__": 424 unittest.main() 425