1 #!/usr/bin/env python 2 # Copyright 2011 Google Inc. All Rights Reserved. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 16 """System integration test for traffic shaping. 17 18 Usage: 19 $ sudo ./trafficshaper_test.py 20 """ 21 22 import daemonserver 23 import logging 24 import platformsettings 25 import socket 26 import SocketServer 27 import trafficshaper 28 import unittest 29 30 RESPONSE_SIZE_KEY = 'response-size:' 31 TEST_DNS_PORT = 5555 32 TEST_HTTP_PORT = 8888 33 TIMER = platformsettings.timer 34 35 36 def GetElapsedMs(start_time, end_time): 37 """Return milliseconds elapsed between |start_time| and |end_time|. 38 39 Args: 40 start_time: seconds as a float (or string representation of float). 41 end_time: seconds as a float (or string representation of float). 42 Return: 43 milliseconds elapsed as integer. 44 """ 45 return int((float(end_time) - float(start_time)) * 1000) 46 47 48 class TrafficShaperTest(unittest.TestCase): 49 50 def testBadBandwidthRaises(self): 51 self.assertRaises(trafficshaper.BandwidthValueError, 52 trafficshaper.TrafficShaper, 53 down_bandwidth='1KBit/s') 54 55 56 class TimedUdpHandler(SocketServer.DatagramRequestHandler): 57 """UDP handler that returns the time when the request was handled.""" 58 59 def handle(self): 60 data = self.rfile.read() 61 read_time = self.server.timer() 62 self.wfile.write(str(read_time)) 63 64 65 class TimedTcpHandler(SocketServer.StreamRequestHandler): 66 """Tcp handler that returns the time when the request was read. 67 68 It can respond with the number of bytes specified in the request. 69 The request looks like: 70 request_data -> RESPONSE_SIZE_KEY num_response_bytes '\n' ANY_DATA 71 """ 72 73 def handle(self): 74 data = self.rfile.read() 75 read_time = self.server.timer() 76 contents = str(read_time) 77 if data.startswith(RESPONSE_SIZE_KEY): 78 num_response_bytes = int(data[len(RESPONSE_SIZE_KEY):data.index('\n')]) 79 contents = '%s\n%s' % (contents, 80 '\x00' * (num_response_bytes - len(contents) - 1)) 81 self.wfile.write(contents) 82 83 84 class TimedUdpServer(SocketServer.ThreadingUDPServer, 85 daemonserver.DaemonServer): 86 """A simple UDP server similar to dnsproxy.""" 87 88 # Override SocketServer.TcpServer setting to avoid intermittent errors. 89 allow_reuse_address = True 90 91 def __init__(self, host, port, timer=TIMER): 92 SocketServer.ThreadingUDPServer.__init__( 93 self, (host, port), TimedUdpHandler) 94 self.timer = timer 95 96 def cleanup(self): 97 pass 98 99 100 class TimedTcpServer(SocketServer.ThreadingTCPServer, 101 daemonserver.DaemonServer): 102 """A simple TCP server similar to httpproxy.""" 103 104 # Override SocketServer.TcpServer setting to avoid intermittent errors. 105 allow_reuse_address = True 106 107 def __init__(self, host, port, timer=TIMER): 108 SocketServer.ThreadingTCPServer.__init__( 109 self, (host, port), TimedTcpHandler) 110 self.timer = timer 111 112 def cleanup(self): 113 try: 114 self.shutdown() 115 except KeyboardInterrupt, e: 116 pass 117 118 119 class TcpTestSocketCreator(object): 120 """A TCP socket creator suitable for with-statement.""" 121 122 def __init__(self, host, port, timeout=1.0): 123 self.address = (host, port) 124 self.timeout = timeout 125 126 def __enter__(self): 127 self.socket = socket.create_connection(self.address, timeout=self.timeout) 128 return self.socket 129 130 def __exit__(self, *args): 131 self.socket.close() 132 133 134 class TimedTestCase(unittest.TestCase): 135 def assertValuesAlmostEqual(self, expected, actual, tolerance=0.05): 136 """Like the following with nicer default message: 137 assertTrue(expected <= actual + tolerance && 138 expected >= actual - tolerance) 139 """ 140 delta = tolerance * expected 141 if actual > expected + delta or actual < expected - delta: 142 self.fail('%s is not equal to expected %s +/- %s%%' % ( 143 actual, expected, 100 * tolerance)) 144 145 146 class TcpTrafficShaperTest(TimedTestCase): 147 148 def setUp(self): 149 self.host = platformsettings.get_server_ip_address() 150 self.port = TEST_HTTP_PORT 151 self.tcp_socket_creator = TcpTestSocketCreator(self.host, self.port) 152 self.timer = TIMER 153 154 def TrafficShaper(self, **kwargs): 155 return trafficshaper.TrafficShaper( 156 host=self.host, ports=(self.port,), **kwargs) 157 158 def GetTcpSendTimeMs(self, num_bytes): 159 """Return time in milliseconds to send |num_bytes|.""" 160 161 with self.tcp_socket_creator as s: 162 start_time = self.timer() 163 request_data = '\x00' * num_bytes 164 165 s.sendall(request_data) 166 # TODO(slamm): Figure out why partial is shutdown needed to make it work. 167 s.shutdown(socket.SHUT_WR) 168 read_time = s.recv(1024) 169 return GetElapsedMs(start_time, read_time) 170 171 def GetTcpReceiveTimeMs(self, num_bytes): 172 """Return time in milliseconds to receive |num_bytes|.""" 173 174 with self.tcp_socket_creator as s: 175 s.sendall('%s%s\n' % (RESPONSE_SIZE_KEY, num_bytes)) 176 # TODO(slamm): Figure out why partial is shutdown needed to make it work. 177 s.shutdown(socket.SHUT_WR) 178 num_remaining_bytes = num_bytes 179 read_time = None 180 while num_remaining_bytes > 0: 181 response_data = s.recv(4096) 182 num_remaining_bytes -= len(response_data) 183 if not read_time: 184 read_time, padding = response_data.split('\n') 185 return GetElapsedMs(read_time, self.timer()) 186 187 def testTcpConnectToIp(self): 188 """Verify that it takes |delay_ms| to establish a TCP connection.""" 189 if not platformsettings.has_ipfw(): 190 logging.warning('ipfw is not available in path. Skip the test') 191 return 192 with TimedTcpServer(self.host, self.port): 193 for delay_ms in (100, 175): 194 with self.TrafficShaper(delay_ms=delay_ms): 195 start_time = self.timer() 196 with self.tcp_socket_creator: 197 connect_time = GetElapsedMs(start_time, self.timer()) 198 self.assertValuesAlmostEqual(delay_ms, connect_time, tolerance=0.12) 199 200 def testTcpUploadShaping(self): 201 """Verify that 'up' bandwidth is shaped on TCP connections.""" 202 if not platformsettings.has_ipfw(): 203 logging.warning('ipfw is not available in path. Skip the test') 204 return 205 num_bytes = 1024 * 100 206 bandwidth_kbits = 2000 207 expected_ms = 8.0 * num_bytes / bandwidth_kbits 208 with TimedTcpServer(self.host, self.port): 209 with self.TrafficShaper(up_bandwidth='%sKbit/s' % bandwidth_kbits): 210 self.assertValuesAlmostEqual(expected_ms, self.GetTcpSendTimeMs(num_bytes)) 211 212 def testTcpDownloadShaping(self): 213 """Verify that 'down' bandwidth is shaped on TCP connections.""" 214 if not platformsettings.has_ipfw(): 215 logging.warning('ipfw is not available in path. Skip the test') 216 return 217 num_bytes = 1024 * 100 218 bandwidth_kbits = 2000 219 expected_ms = 8.0 * num_bytes / bandwidth_kbits 220 with TimedTcpServer(self.host, self.port): 221 with self.TrafficShaper(down_bandwidth='%sKbit/s' % bandwidth_kbits): 222 self.assertValuesAlmostEqual(expected_ms, self.GetTcpReceiveTimeMs(num_bytes)) 223 224 def testTcpInterleavedDownloads(self): 225 # TODO(slamm): write tcp interleaved downloads test 226 pass 227 228 229 class UdpTrafficShaperTest(TimedTestCase): 230 231 def setUp(self): 232 self.host = platformsettings.get_server_ip_address() 233 self.dns_port = TEST_DNS_PORT 234 self.timer = TIMER 235 236 def TrafficShaper(self, **kwargs): 237 return trafficshaper.TrafficShaper( 238 host=self.host, ports=(self.dns_port,), **kwargs) 239 240 def GetUdpSendReceiveTimesMs(self): 241 """Return time in milliseconds to send |num_bytes|.""" 242 start_time = self.timer() 243 udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 244 udp_socket.sendto('test data\n', (self.host, self.dns_port)) 245 read_time = udp_socket.recv(1024) 246 return (GetElapsedMs(start_time, read_time), 247 GetElapsedMs(read_time, self.timer())) 248 249 def testUdpDelay(self): 250 if not platformsettings.has_ipfw(): 251 logging.warning('ipfw is not available in path. Skip the test') 252 return 253 for delay_ms in (100, 170): 254 expected_ms = delay_ms / 2 255 with TimedUdpServer(self.host, self.dns_port): 256 with self.TrafficShaper(delay_ms=delay_ms): 257 send_ms, receive_ms = self.GetUdpSendReceiveTimesMs() 258 self.assertValuesAlmostEqual(expected_ms, send_ms, tolerance=0.10) 259 self.assertValuesAlmostEqual(expected_ms, receive_ms, tolerance=0.10) 260 261 262 def testUdpInterleavedDelay(self): 263 # TODO(slamm): write udp interleaved udp delay test 264 pass 265 266 267 class TcpAndUdpTrafficShaperTest(TimedTestCase): 268 # TODO(slamm): Test concurrent TCP and UDP traffic 269 pass 270 271 272 # TODO(slamm): Packet loss rate (try different ports) 273 274 275 if __name__ == '__main__': 276 #logging.getLogger().setLevel(logging.DEBUG) 277 unittest.main() 278