1 # Copyright 2014 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import collections 6 import datetime 7 import logging 8 import multiprocessing 9 import os 10 import posixpath 11 import Queue 12 import re 13 import subprocess 14 import sys 15 import threading 16 import time 17 18 19 # addr2line builds a possibly infinite memory cache that can exhaust 20 # the computer's memory if allowed to grow for too long. This constant 21 # controls how many lookups we do before restarting the process. 4000 22 # gives near peak performance without extreme memory usage. 23 ADDR2LINE_RECYCLE_LIMIT = 4000 24 25 26 class ELFSymbolizer(object): 27 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. 28 29 This class is a frontend for addr2line (part of GNU binutils), designed to 30 symbolize batches of large numbers of symbols for a given ELF file. It 31 supports sharding symbolization against many addr2line instances and 32 pipelining of multiple requests per each instance (in order to hide addr2line 33 internals and OS pipe latencies). 34 35 The interface exhibited by this class is a very simple asynchronous interface, 36 which is based on the following three methods: 37 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. 38 - The |callback| method: used to communicated back the symbol information. 39 - Join(): called to conclude the batch to gather the last outstanding results. 40 In essence, before the Join method returns, this class will have issued as 41 many callbacks as the number of SymbolizeAsync() calls. In this regard, note 42 that due to multiprocess sharding, callbacks can be delivered out of order. 43 44 Some background about addr2line: 45 - it is invoked passing the elf path in the cmdline, piping the addresses in 46 its stdin and getting results on its stdout. 47 - it has pretty large response times for the first requests, but it 48 works very well in streaming mode once it has been warmed up. 49 - it doesn't scale by itself (on more cores). However, spawning multiple 50 instances at the same time on the same file is pretty efficient as they 51 keep hitting the pagecache and become mostly CPU bound. 52 - it might hang or crash, mostly for OOM. This class deals with both of these 53 problems. 54 55 Despite the "scary" imports and the multi* words above, (almost) no multi- 56 threading/processing is involved from the python viewpoint. Concurrency 57 here is achieved by spawning several addr2line subprocesses and handling their 58 output pipes asynchronously. Therefore, all the code here (with the exception 59 of the Queue instance in Addr2Line) should be free from mind-blowing 60 thread-safety concerns. 61 62 The multiprocess sharding works as follows: 63 The symbolizer tries to use the lowest number of addr2line instances as 64 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests 65 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't 66 worth the startup cost. 67 The multiprocess logic kicks in as soon as the queues for the existing 68 instances grow. Specifically, once all the existing instances reach the 69 |max_queue_size| bound, a new addr2line instance is kicked in. 70 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances 71 have a backlog of |max_queue_size|), back-pressure is applied on the caller by 72 blocking the SymbolizeAsync method. 73 74 This module has been deliberately designed to be dependency free (w.r.t. of 75 other modules in this project), to allow easy reuse in external projects. 76 """ 77 78 def __init__(self, elf_file_path, addr2line_path, callback, inlines=False, 79 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50, 80 source_root_path=None, strip_base_path=None): 81 """Args: 82 elf_file_path: path of the elf file to be symbolized. 83 addr2line_path: path of the toolchain's addr2line binary. 84 callback: a callback which will be invoked for each resolved symbol with 85 the two args (sym_info, callback_arg). The former is an instance of 86 |ELFSymbolInfo| and contains the symbol information. The latter is an 87 embedder-provided argument which is passed to SymbolizeAsync(). 88 inlines: when True, the ELFSymbolInfo will contain also the details about 89 the outer inlining functions. When False, only the innermost function 90 will be provided. 91 max_concurrent_jobs: Max number of addr2line instances spawned. 92 Parallelize responsibly, addr2line is a memory and I/O monster. 93 max_queue_size: Max number of outstanding requests per addr2line instance. 94 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. 95 After the timeout, the instance will be considered hung and respawned. 96 source_root_path: In some toolchains only the name of the source file is 97 is output, without any path information; disambiguation searches 98 through the source directory specified by |source_root_path| argument 99 for files whose name matches, adding the full path information to the 100 output. For example, if the toolchain outputs "unicode.cc" and there 101 is a file called "unicode.cc" located under |source_root_path|/foo, 102 the tool will replace "unicode.cc" with 103 "|source_root_path|/foo/unicode.cc". If there are multiple files with 104 the same name, disambiguation will fail because the tool cannot 105 determine which of the files was the source of the symbol. 106 strip_base_path: Rebases the symbols source paths onto |source_root_path| 107 (i.e replace |strip_base_path| with |source_root_path). 108 """ 109 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path 110 self.elf_file_path = elf_file_path 111 self.addr2line_path = addr2line_path 112 self.callback = callback 113 self.inlines = inlines 114 self.max_concurrent_jobs = (max_concurrent_jobs or 115 min(multiprocessing.cpu_count(), 4)) 116 self.max_queue_size = max_queue_size 117 self.addr2line_timeout = addr2line_timeout 118 self.requests_counter = 0 # For generating monotonic request IDs. 119 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. 120 121 # If necessary, create disambiguation lookup table 122 self.disambiguate = source_root_path is not None 123 self.disambiguation_table = {} 124 self.strip_base_path = strip_base_path 125 if(self.disambiguate): 126 self.source_root_path = os.path.abspath(source_root_path) 127 self._CreateDisambiguationTable() 128 129 # Create one addr2line instance. More instances will be created on demand 130 # (up to |max_concurrent_jobs|) depending on the rate of the requests. 131 self._CreateNewA2LInstance() 132 133 def SymbolizeAsync(self, addr, callback_arg=None): 134 """Requests symbolization of a given address. 135 136 This method is not guaranteed to return immediately. It generally does, but 137 in some scenarios (e.g. all addr2line instances have full queues) it can 138 block to create back-pressure. 139 140 Args: 141 addr: address to symbolize. 142 callback_arg: optional argument which will be passed to the |callback|.""" 143 assert(isinstance(addr, int)) 144 145 # Process all the symbols that have been resolved in the meanwhile. 146 # Essentially, this drains all the addr2line(s) out queues. 147 for a2l_to_purge in self._a2l_instances: 148 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() 149 a2l_to_purge.RecycleIfNecessary() 150 151 # Find the best instance according to this logic: 152 # 1. Find an existing instance with the shortest queue. 153 # 2. If all of instances' queues are full, but there is room in the pool, 154 # (i.e. < |max_concurrent_jobs|) create a new instance. 155 # 3. If there were already |max_concurrent_jobs| instances and all of them 156 # had full queues, make back-pressure. 157 158 # 1. 159 def _SortByQueueSizeAndReqID(a2l): 160 return (a2l.queue_size, a2l.first_request_id) 161 a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID) 162 163 # 2. 164 if (a2l.queue_size >= self.max_queue_size and 165 len(self._a2l_instances) < self.max_concurrent_jobs): 166 a2l = self._CreateNewA2LInstance() 167 168 # 3. 169 if a2l.queue_size >= self.max_queue_size: 170 a2l.WaitForNextSymbolInQueue() 171 172 a2l.EnqueueRequest(addr, callback_arg) 173 174 def Join(self): 175 """Waits for all the outstanding requests to complete and terminates.""" 176 for a2l in self._a2l_instances: 177 a2l.WaitForIdle() 178 a2l.Terminate() 179 180 def _CreateNewA2LInstance(self): 181 assert(len(self._a2l_instances) < self.max_concurrent_jobs) 182 a2l = ELFSymbolizer.Addr2Line(self) 183 self._a2l_instances.append(a2l) 184 return a2l 185 186 def _CreateDisambiguationTable(self): 187 """ Non-unique file names will result in None entries""" 188 start_time = time.time() 189 logging.info('Collecting information about available source files...') 190 self.disambiguation_table = {} 191 192 for root, _, filenames in os.walk(self.source_root_path): 193 for f in filenames: 194 self.disambiguation_table[f] = os.path.join(root, f) if (f not in 195 self.disambiguation_table) else None 196 logging.info('Finished collecting information about ' 197 'possible files (took %.1f s).', 198 (time.time() - start_time)) 199 200 201 class Addr2Line(object): 202 """A python wrapper around an addr2line instance. 203 204 The communication with the addr2line process looks as follows: 205 [STDIN] [STDOUT] (from addr2line's viewpoint) 206 > f001111 207 > f002222 208 < Symbol::Name(foo, bar) for f001111 209 < /path/to/source/file.c:line_number 210 > f003333 211 < Symbol::Name2() for f002222 212 < /path/to/source/file.c:line_number 213 < Symbol::Name3() for f003333 214 < /path/to/source/file.c:line_number 215 """ 216 217 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') 218 219 def __init__(self, symbolizer): 220 self._symbolizer = symbolizer 221 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) 222 223 # The request queue (i.e. addresses pushed to addr2line's stdin and not 224 # yet retrieved on stdout) 225 self._request_queue = collections.deque() 226 227 # This is essentially len(self._request_queue). It has been optimized to a 228 # separate field because turned out to be a perf hot-spot. 229 self.queue_size = 0 230 231 # Keep track of the number of symbols a process has processed to 232 # avoid a single process growing too big and using all the memory. 233 self._processed_symbols_count = 0 234 235 # Objects required to handle the addr2line subprocess. 236 self._proc = None # Subprocess.Popen(...) instance. 237 self._thread = None # Threading.thread instance. 238 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). 239 self._RestartAddr2LineProcess() 240 241 def EnqueueRequest(self, addr, callback_arg): 242 """Pushes an address to addr2line's stdin (and keeps track of it).""" 243 self._symbolizer.requests_counter += 1 # For global "age" of requests. 244 req_idx = self._symbolizer.requests_counter 245 self._request_queue.append((addr, callback_arg, req_idx)) 246 self.queue_size += 1 247 self._WriteToA2lStdin(addr) 248 249 def WaitForIdle(self): 250 """Waits until all the pending requests have been symbolized.""" 251 while self.queue_size > 0: 252 self.WaitForNextSymbolInQueue() 253 254 def WaitForNextSymbolInQueue(self): 255 """Waits for the next pending request to be symbolized.""" 256 if not self.queue_size: 257 return 258 259 # This outer loop guards against a2l hanging (detecting stdout timeout). 260 while True: 261 start_time = datetime.datetime.now() 262 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) 263 264 # The inner loop guards against a2l crashing (checking if it exited). 265 while (datetime.datetime.now() - start_time < timeout): 266 # poll() returns !None if the process exited. a2l should never exit. 267 if self._proc.poll(): 268 logging.warning('addr2line crashed, respawning (lib: %s).' % 269 self._lib_file_name) 270 self._RestartAddr2LineProcess() 271 # TODO(primiano): the best thing to do in this case would be 272 # shrinking the pool size as, very likely, addr2line is crashed 273 # due to low memory (and the respawned one will die again soon). 274 275 try: 276 lines = self._out_queue.get(block=True, timeout=0.25) 277 except Queue.Empty: 278 # On timeout (1/4 s.) repeat the inner loop and check if either the 279 # addr2line process did crash or we waited its output for too long. 280 continue 281 282 # In nominal conditions, we get straight to this point. 283 self._ProcessSymbolOutput(lines) 284 return 285 286 # If this point is reached, we waited more than |addr2line_timeout|. 287 logging.warning('Hung addr2line process, respawning (lib: %s).' % 288 self._lib_file_name) 289 self._RestartAddr2LineProcess() 290 291 def ProcessAllResolvedSymbolsInQueue(self): 292 """Consumes all the addr2line output lines produced (without blocking).""" 293 if not self.queue_size: 294 return 295 while True: 296 try: 297 lines = self._out_queue.get_nowait() 298 except Queue.Empty: 299 break 300 self._ProcessSymbolOutput(lines) 301 302 def RecycleIfNecessary(self): 303 """Restarts the process if it has been used for too long. 304 305 A long running addr2line process will consume excessive amounts 306 of memory without any gain in performance.""" 307 if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT: 308 self._RestartAddr2LineProcess() 309 310 311 def Terminate(self): 312 """Kills the underlying addr2line process. 313 314 The poller |_thread| will terminate as well due to the broken pipe.""" 315 try: 316 self._proc.kill() 317 self._proc.communicate() # Essentially wait() without risking deadlock. 318 except Exception: # An exception while terminating? How interesting. 319 pass 320 self._proc = None 321 322 def _WriteToA2lStdin(self, addr): 323 self._proc.stdin.write('%s\n' % hex(addr)) 324 if self._symbolizer.inlines: 325 # In the case of inlines we output an extra blank line, which causes 326 # addr2line to emit a (??,??:0) tuple that we use as a boundary marker. 327 self._proc.stdin.write('\n') 328 self._proc.stdin.flush() 329 330 def _ProcessSymbolOutput(self, lines): 331 """Parses an addr2line symbol output and triggers the client callback.""" 332 (_, callback_arg, _) = self._request_queue.popleft() 333 self.queue_size -= 1 334 335 innermost_sym_info = None 336 sym_info = None 337 for (line1, line2) in lines: 338 prev_sym_info = sym_info 339 name = line1 if not line1.startswith('?') else None 340 source_path = None 341 source_line = None 342 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2) 343 if m: 344 if not m.group(1).startswith('?'): 345 source_path = m.group(1) 346 if not m.group(2).startswith('?'): 347 source_line = int(m.group(2)) 348 else: 349 logging.warning('Got invalid symbol path from addr2line: %s' % line2) 350 351 # In case disambiguation is on, and needed 352 was_ambiguous = False 353 disambiguated = False 354 if self._symbolizer.disambiguate: 355 if source_path and not posixpath.isabs(source_path): 356 path = self._symbolizer.disambiguation_table.get(source_path) 357 was_ambiguous = True 358 disambiguated = path is not None 359 source_path = path if disambiguated else source_path 360 361 # Use absolute paths (so that paths are consistent, as disambiguation 362 # uses absolute paths) 363 if source_path and not was_ambiguous: 364 source_path = os.path.abspath(source_path) 365 366 if source_path and self._symbolizer.strip_base_path: 367 # Strip the base path 368 source_path = re.sub('^' + self._symbolizer.strip_base_path, 369 self._symbolizer.source_root_path or '', source_path) 370 371 sym_info = ELFSymbolInfo(name, source_path, source_line, was_ambiguous, 372 disambiguated) 373 if prev_sym_info: 374 prev_sym_info.inlined_by = sym_info 375 if not innermost_sym_info: 376 innermost_sym_info = sym_info 377 378 self._processed_symbols_count += 1 379 self._symbolizer.callback(innermost_sym_info, callback_arg) 380 381 def _RestartAddr2LineProcess(self): 382 if self._proc: 383 self.Terminate() 384 385 # The only reason of existence of this Queue (and the corresponding 386 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). 387 # Essentially this is a pipe able to extract a couple of lines atomically. 388 self._out_queue = Queue.Queue() 389 390 # Start the underlying addr2line process in line buffered mode. 391 392 cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle', 393 '--exe=' + self._symbolizer.elf_file_path] 394 if self._symbolizer.inlines: 395 cmd += ['--inlines'] 396 self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, 397 stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) 398 399 # Start the poller thread, which simply moves atomically the lines read 400 # from the addr2line's stdout to the |_out_queue|. 401 self._thread = threading.Thread( 402 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, 403 args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines)) 404 self._thread.daemon = True # Don't prevent early process exit. 405 self._thread.start() 406 407 self._processed_symbols_count = 0 408 409 # Replay the pending requests on the new process (only for the case 410 # of a hung addr2line timing out during the game). 411 for (addr, _, _) in self._request_queue: 412 self._WriteToA2lStdin(addr) 413 414 @staticmethod 415 def StdoutReaderThread(process_pipe, queue, inlines): 416 """The poller thread fn, which moves the addr2line stdout to the |queue|. 417 418 This is the only piece of code not running on the main thread. It merely 419 writes to a Queue, which is thread-safe. In the case of inlines, it 420 detects the ??,??:0 marker and sends the lines atomically, such that the 421 main thread always receives all the lines corresponding to one symbol in 422 one shot.""" 423 try: 424 lines_for_one_symbol = [] 425 while True: 426 line1 = process_pipe.readline().rstrip('\r\n') 427 line2 = process_pipe.readline().rstrip('\r\n') 428 if not line1 or not line2: 429 break 430 inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or 431 (line1 != '??' and line2 != '??:0')) 432 if not inlines or inline_has_more_lines: 433 lines_for_one_symbol += [(line1, line2)] 434 if inline_has_more_lines: 435 continue 436 queue.put(lines_for_one_symbol) 437 lines_for_one_symbol = [] 438 process_pipe.close() 439 440 # Every addr2line processes will die at some point, please die silently. 441 except (IOError, OSError): 442 pass 443 444 @property 445 def first_request_id(self): 446 """Returns the request_id of the oldest pending request in the queue.""" 447 return self._request_queue[0][2] if self._request_queue else 0 448 449 450 class ELFSymbolInfo(object): 451 """The result of the symbolization passed as first arg. of each callback.""" 452 453 def __init__(self, name, source_path, source_line, was_ambiguous=False, 454 disambiguated=False): 455 """All the fields here can be None (if addr2line replies with '??').""" 456 self.name = name 457 self.source_path = source_path 458 self.source_line = source_line 459 # In the case of |inlines|=True, the |inlined_by| points to the outer 460 # function inlining the current one (and so on, to form a chain). 461 self.inlined_by = None 462 self.disambiguated = disambiguated 463 self.was_ambiguous = was_ambiguous 464 465 def __str__(self): 466 return '%s [%s:%d]' % ( 467 self.name or '??', self.source_path or '??', self.source_line or 0) 468