1 #!/usr/bin/env python 2 # Copyright 2012 Google Inc. All Rights Reserved. 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 16 """Unit tests for proxyshaper. 17 18 Usage: 19 $ ./proxyshaper_test.py 20 """ 21 22 import proxyshaper 23 import StringIO 24 import unittest 25 26 27 # pylint: disable=bad-whitespace 28 VALID_RATES = ( 29 # input, expected_bps 30 ( '384Kbit/s', 384000), 31 ('1536Kbit/s', 1536000), 32 ( '1Mbit/s', 1000000), 33 ( '5Mbit/s', 5000000), 34 ( '2MByte/s', 16000000), 35 ( '0', 0), 36 ( '5', 5), 37 ( 384000, 384000), 38 ) 39 40 ERROR_RATES = ( 41 '1536KBit/s', # Older versions of dummynet used capital 'B' for bytes. 42 '1Mbyte/s', # Require capital 'B' for bytes. 43 '5bps', 44 ) 45 46 47 class TimedTestCase(unittest.TestCase): 48 def assertValuesAlmostEqual(self, expected, actual, tolerance=0.05): 49 """Like the following with nicer default message: 50 assertTrue(expected <= actual + tolerance && 51 expected >= actual - tolerance) 52 """ 53 delta = tolerance * expected 54 if actual > expected + delta or actual < expected - delta: 55 self.fail('%s is not equal to expected %s +/- %s%%' % ( 56 actual, expected, 100 * tolerance)) 57 58 59 class RateLimitedFileTest(TimedTestCase): 60 def testReadLimitedBasic(self): 61 num_bytes = 1024 62 bps = 384000 63 request_counter = lambda: 1 64 f = StringIO.StringIO(' ' * num_bytes) 65 limited_f = proxyshaper.RateLimitedFile(request_counter, f, bps) 66 start = proxyshaper.TIMER() 67 self.assertEqual(num_bytes, len(limited_f.read())) 68 expected_ms = 8.0 * num_bytes / bps * 1000.0 69 actual_ms = (proxyshaper.TIMER() - start) * 1000.0 70 self.assertValuesAlmostEqual(expected_ms, actual_ms) 71 72 def testReadlineLimitedBasic(self): 73 num_bytes = 1024 * 8 + 512 74 bps = 384000 75 request_counter = lambda: 1 76 f = StringIO.StringIO(' ' * num_bytes) 77 limited_f = proxyshaper.RateLimitedFile(request_counter, f, bps) 78 start = proxyshaper.TIMER() 79 self.assertEqual(num_bytes, len(limited_f.readline())) 80 expected_ms = 8.0 * num_bytes / bps * 1000.0 81 actual_ms = (proxyshaper.TIMER() - start) * 1000.0 82 self.assertValuesAlmostEqual(expected_ms, actual_ms) 83 84 def testReadLimitedSlowedByMultipleRequests(self): 85 num_bytes = 1024 86 bps = 384000 87 request_count = 2 88 request_counter = lambda: request_count 89 f = StringIO.StringIO(' ' * num_bytes) 90 limited_f = proxyshaper.RateLimitedFile(request_counter, f, bps) 91 start = proxyshaper.TIMER() 92 num_read_bytes = limited_f.read() 93 self.assertEqual(num_bytes, len(num_read_bytes)) 94 expected_ms = 8.0 * num_bytes / (bps / float(request_count)) * 1000.0 95 actual_ms = (proxyshaper.TIMER() - start) * 1000.0 96 self.assertValuesAlmostEqual(expected_ms, actual_ms) 97 98 def testWriteLimitedBasic(self): 99 num_bytes = 1024 * 10 + 350 100 bps = 384000 101 request_counter = lambda: 1 102 f = StringIO.StringIO() 103 limited_f = proxyshaper.RateLimitedFile(request_counter, f, bps) 104 start = proxyshaper.TIMER() 105 limited_f.write(' ' * num_bytes) 106 self.assertEqual(num_bytes, len(limited_f.getvalue())) 107 expected_ms = 8.0 * num_bytes / bps * 1000.0 108 actual_ms = (proxyshaper.TIMER() - start) * 1000.0 109 self.assertValuesAlmostEqual(expected_ms, actual_ms) 110 111 def testWriteLimitedSlowedByMultipleRequests(self): 112 num_bytes = 1024 * 10 113 bps = 384000 114 request_count = 2 115 request_counter = lambda: request_count 116 f = StringIO.StringIO(' ' * num_bytes) 117 limited_f = proxyshaper.RateLimitedFile(request_counter, f, bps) 118 start = proxyshaper.TIMER() 119 limited_f.write(' ' * num_bytes) 120 self.assertEqual(num_bytes, len(limited_f.getvalue())) 121 expected_ms = 8.0 * num_bytes / (bps / float(request_count)) * 1000.0 122 actual_ms = (proxyshaper.TIMER() - start) * 1000.0 123 self.assertValuesAlmostEqual(expected_ms, actual_ms) 124 125 126 class GetBitsPerSecondTest(unittest.TestCase): 127 def testConvertsValidValues(self): 128 for dummynet_option, expected_bps in VALID_RATES: 129 bps = proxyshaper.GetBitsPerSecond(dummynet_option) 130 self.assertEqual( 131 expected_bps, bps, 'Unexpected result for %s: %s != %s' % ( 132 dummynet_option, expected_bps, bps)) 133 134 def testRaisesOnUnexpectedValues(self): 135 for dummynet_option in ERROR_RATES: 136 self.assertRaises(proxyshaper.BandwidthValueError, 137 proxyshaper.GetBitsPerSecond, dummynet_option) 138 139 140 if __name__ == '__main__': 141 unittest.main() 142