1 #!/usr/bin/env python 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 import cStringIO 7 import logging 8 import os 9 import sys 10 import textwrap 11 import unittest 12 13 BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 14 sys.path.append(BASE_PATH) 15 16 from lib.bucket import Bucket 17 from lib.ordered_dict import OrderedDict 18 from lib.policy import Policy 19 from lib.symbol import SymbolMappingCache 20 from lib.symbol import FUNCTION_SYMBOLS, SOURCEFILE_SYMBOLS, TYPEINFO_SYMBOLS 21 22 import subcommands 23 24 25 class SymbolMappingCacheTest(unittest.TestCase): 26 class MockBucketSet(object): 27 def __init__(self, addresses): 28 self._addresses = addresses 29 30 def iter_addresses(self, symbol_type): # pylint: disable=W0613 31 for address in self._addresses: 32 yield address 33 34 class MockSymbolFinder(object): 35 def __init__(self, mapping): 36 self._mapping = mapping 37 38 def find(self, address_list): 39 result = OrderedDict() 40 for address in address_list: 41 result[address] = self._mapping[address] 42 return result 43 44 _TEST_FUNCTION_CACHE = textwrap.dedent("""\ 45 1 0x0000000000000001 46 7fc33eebcaa4 __gnu_cxx::new_allocator::allocate 47 7fc33ef69242 void DispatchToMethod 48 """) 49 50 _EXPECTED_TEST_FUNCTION_CACHE = textwrap.dedent("""\ 51 1 0x0000000000000001 52 7fc33eebcaa4 __gnu_cxx::new_allocator::allocate 53 7fc33ef69242 void DispatchToMethod 54 2 0x0000000000000002 55 7fc33ef7bc3e std::map::operator[] 56 7fc34411f9d5 WTF::RefCounted::operator new 57 """) 58 59 _TEST_FUNCTION_ADDRESS_LIST1 = [ 60 0x1, 0x7fc33eebcaa4, 0x7fc33ef69242] 61 62 _TEST_FUNCTION_ADDRESS_LIST2 = [ 63 0x1, 0x2, 0x7fc33eebcaa4, 0x7fc33ef69242, 0x7fc33ef7bc3e, 0x7fc34411f9d5] 64 65 _TEST_FUNCTION_DICT = { 66 0x1: '0x0000000000000001', 67 0x2: '0x0000000000000002', 68 0x7fc33eebcaa4: '__gnu_cxx::new_allocator::allocate', 69 0x7fc33ef69242: 'void DispatchToMethod', 70 0x7fc33ef7bc3e: 'std::map::operator[]', 71 0x7fc34411f9d5: 'WTF::RefCounted::operator new', 72 } 73 74 def test_update(self): 75 symbol_mapping_cache = SymbolMappingCache() 76 cache_f = cStringIO.StringIO() 77 cache_f.write(self._TEST_FUNCTION_CACHE) 78 79 # No update from self._TEST_FUNCTION_CACHE 80 symbol_mapping_cache.update( 81 FUNCTION_SYMBOLS, 82 self.MockBucketSet(self._TEST_FUNCTION_ADDRESS_LIST1), 83 self.MockSymbolFinder(self._TEST_FUNCTION_DICT), cache_f) 84 for address in self._TEST_FUNCTION_ADDRESS_LIST1: 85 self.assertEqual(self._TEST_FUNCTION_DICT[address], 86 symbol_mapping_cache.lookup(FUNCTION_SYMBOLS, address)) 87 self.assertEqual(self._TEST_FUNCTION_CACHE, cache_f.getvalue()) 88 89 # Update to self._TEST_FUNCTION_ADDRESS_LIST2 90 symbol_mapping_cache.update( 91 FUNCTION_SYMBOLS, 92 self.MockBucketSet(self._TEST_FUNCTION_ADDRESS_LIST2), 93 self.MockSymbolFinder(self._TEST_FUNCTION_DICT), cache_f) 94 for address in self._TEST_FUNCTION_ADDRESS_LIST2: 95 self.assertEqual(self._TEST_FUNCTION_DICT[address], 96 symbol_mapping_cache.lookup(FUNCTION_SYMBOLS, address)) 97 self.assertEqual(self._EXPECTED_TEST_FUNCTION_CACHE, cache_f.getvalue()) 98 99 100 class PolicyTest(unittest.TestCase): 101 class MockSymbolMappingCache(object): 102 def __init__(self): 103 self._symbol_caches = { 104 FUNCTION_SYMBOLS: {}, 105 SOURCEFILE_SYMBOLS: {}, 106 TYPEINFO_SYMBOLS: {}, 107 } 108 109 def add(self, symbol_type, address, symbol): 110 self._symbol_caches[symbol_type][address] = symbol 111 112 def lookup(self, symbol_type, address): 113 symbol = self._symbol_caches[symbol_type].get(address) 114 return symbol if symbol else '0x%016x' % address 115 116 _TEST_POLICY = textwrap.dedent("""\ 117 { 118 "components": [ 119 "second", 120 "mmap-v8", 121 "malloc-v8", 122 "malloc-WebKit", 123 "mmap-catch-all", 124 "malloc-catch-all" 125 ], 126 "rules": [ 127 { 128 "name": "second", 129 "stacktrace": "optional", 130 "allocator": "optional" 131 }, 132 { 133 "name": "mmap-v8", 134 "stacktrace": ".*v8::.*", 135 "allocator": "mmap" 136 }, 137 { 138 "name": "malloc-v8", 139 "stacktrace": ".*v8::.*", 140 "allocator": "malloc" 141 }, 142 { 143 "name": "malloc-WebKit", 144 "stacktrace": ".*WebKit::.*", 145 "allocator": "malloc" 146 }, 147 { 148 "name": "mmap-catch-all", 149 "stacktrace": ".*", 150 "allocator": "mmap" 151 }, 152 { 153 "name": "malloc-catch-all", 154 "stacktrace": ".*", 155 "allocator": "malloc" 156 } 157 ], 158 "version": "POLICY_DEEP_3" 159 } 160 """) 161 162 def test_load(self): 163 policy = Policy.parse(cStringIO.StringIO(self._TEST_POLICY), 'json') 164 self.assertTrue(policy) 165 self.assertEqual('POLICY_DEEP_3', policy.version) 166 167 def test_find(self): 168 policy = Policy.parse(cStringIO.StringIO(self._TEST_POLICY), 'json') 169 self.assertTrue(policy) 170 171 symbol_mapping_cache = self.MockSymbolMappingCache() 172 symbol_mapping_cache.add(FUNCTION_SYMBOLS, 0x1212, 'v8::create') 173 symbol_mapping_cache.add(FUNCTION_SYMBOLS, 0x1381, 'WebKit::create') 174 175 bucket1 = Bucket([0x1212, 0x013], 'malloc', 0x29492, '_Z') 176 bucket1.symbolize(symbol_mapping_cache) 177 bucket2 = Bucket([0x18242, 0x1381], 'malloc', 0x9492, '_Z') 178 bucket2.symbolize(symbol_mapping_cache) 179 bucket3 = Bucket([0x18242, 0x181], 'malloc', 0x949, '_Z') 180 bucket3.symbolize(symbol_mapping_cache) 181 182 self.assertEqual('malloc-v8', policy.find_malloc(bucket1)) 183 self.assertEqual('malloc-WebKit', policy.find_malloc(bucket2)) 184 self.assertEqual('malloc-catch-all', policy.find_malloc(bucket3)) 185 186 187 class BucketsCommandTest(unittest.TestCase): 188 def test(self): 189 BUCKETS_PATH = os.path.join(BASE_PATH, 'tests', 'output', 'buckets') 190 with open(BUCKETS_PATH) as output_f: 191 expected = output_f.read() 192 193 out = cStringIO.StringIO() 194 195 HEAP_PATH = os.path.join(BASE_PATH, 'tests', 'data', 'heap.01234.0001.heap') 196 subcommand = subcommands.BucketsCommand() 197 returncode = subcommand.do(['buckets', HEAP_PATH], out) 198 self.assertEqual(0, returncode) 199 self.assertEqual(expected, out.getvalue()) 200 201 202 class UploadCommandTest(unittest.TestCase): 203 def test(self): 204 MOCK_GSUTIL_PATH = os.path.join(BASE_PATH, 'tests', 'mock_gsutil.py') 205 HEAP_PATH = os.path.join(BASE_PATH, 'tests', 'data', 'heap.01234.0001.heap') 206 subcommand = subcommands.UploadCommand() 207 returncode = subcommand.do([ 208 'upload', 209 '--gsutil', 210 MOCK_GSUTIL_PATH, 211 HEAP_PATH, 212 'gs://test-storage/']) 213 self.assertEqual(0, returncode) 214 215 216 if __name__ == '__main__': 217 logging.basicConfig( 218 level=logging.DEBUG if '-v' in sys.argv else logging.ERROR, 219 format='%(levelname)5s %(filename)15s(%(lineno)3d): %(message)s') 220 unittest.main() 221