1 # Copyright 2014 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import collections 6 import datetime 7 import logging 8 import multiprocessing 9 import os 10 import posixpath 11 import Queue 12 import re 13 import subprocess 14 import sys 15 import threading 16 17 18 # addr2line builds a possibly infinite memory cache that can exhaust 19 # the computer's memory if allowed to grow for too long. This constant 20 # controls how many lookups we do before restarting the process. 4000 21 # gives near peak performance without extreme memory usage. 22 ADDR2LINE_RECYCLE_LIMIT = 4000 23 24 25 class ELFSymbolizer(object): 26 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. 27 28 This class is a frontend for addr2line (part of GNU binutils), designed to 29 symbolize batches of large numbers of symbols for a given ELF file. It 30 supports sharding symbolization against many addr2line instances and 31 pipelining of multiple requests per each instance (in order to hide addr2line 32 internals and OS pipe latencies). 33 34 The interface exhibited by this class is a very simple asynchronous interface, 35 which is based on the following three methods: 36 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. 37 - The |callback| method: used to communicated back the symbol information. 38 - Join(): called to conclude the batch to gather the last outstanding results. 39 In essence, before the Join method returns, this class will have issued as 40 many callbacks as the number of SymbolizeAsync() calls. In this regard, note 41 that due to multiprocess sharding, callbacks can be delivered out of order. 42 43 Some background about addr2line: 44 - it is invoked passing the elf path in the cmdline, piping the addresses in 45 its stdin and getting results on its stdout. 46 - it has pretty large response times for the first requests, but it 47 works very well in streaming mode once it has been warmed up. 48 - it doesn't scale by itself (on more cores). However, spawning multiple 49 instances at the same time on the same file is pretty efficient as they 50 keep hitting the pagecache and become mostly CPU bound. 51 - it might hang or crash, mostly for OOM. This class deals with both of these 52 problems. 53 54 Despite the "scary" imports and the multi* words above, (almost) no multi- 55 threading/processing is involved from the python viewpoint. Concurrency 56 here is achieved by spawning several addr2line subprocesses and handling their 57 output pipes asynchronously. Therefore, all the code here (with the exception 58 of the Queue instance in Addr2Line) should be free from mind-blowing 59 thread-safety concerns. 60 61 The multiprocess sharding works as follows: 62 The symbolizer tries to use the lowest number of addr2line instances as 63 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests 64 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't 65 worth the startup cost. 66 The multiprocess logic kicks in as soon as the queues for the existing 67 instances grow. Specifically, once all the existing instances reach the 68 |max_queue_size| bound, a new addr2line instance is kicked in. 69 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances 70 have a backlog of |max_queue_size|), back-pressure is applied on the caller by 71 blocking the SymbolizeAsync method. 72 73 This module has been deliberately designed to be dependency free (w.r.t. of 74 other modules in this project), to allow easy reuse in external projects. 75 """ 76 77 def __init__(self, elf_file_path, addr2line_path, callback, inlines=False, 78 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50): 79 """Args: 80 elf_file_path: path of the elf file to be symbolized. 81 addr2line_path: path of the toolchain's addr2line binary. 82 callback: a callback which will be invoked for each resolved symbol with 83 the two args (sym_info, callback_arg). The former is an instance of 84 |ELFSymbolInfo| and contains the symbol information. The latter is an 85 embedder-provided argument which is passed to SymbolizeAsync(). 86 inlines: when True, the ELFSymbolInfo will contain also the details about 87 the outer inlining functions. When False, only the innermost function 88 will be provided. 89 max_concurrent_jobs: Max number of addr2line instances spawned. 90 Parallelize responsibly, addr2line is a memory and I/O monster. 91 max_queue_size: Max number of outstanding requests per addr2line instance. 92 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. 93 After the timeout, the instance will be considered hung and respawned. 94 """ 95 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path 96 self.elf_file_path = elf_file_path 97 self.addr2line_path = addr2line_path 98 self.callback = callback 99 self.inlines = inlines 100 self.max_concurrent_jobs = (max_concurrent_jobs or 101 min(multiprocessing.cpu_count(), 4)) 102 self.max_queue_size = max_queue_size 103 self.addr2line_timeout = addr2line_timeout 104 self.requests_counter = 0 # For generating monotonic request IDs. 105 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. 106 107 # Create one addr2line instance. More instances will be created on demand 108 # (up to |max_concurrent_jobs|) depending on the rate of the requests. 109 self._CreateNewA2LInstance() 110 111 def SymbolizeAsync(self, addr, callback_arg=None): 112 """Requests symbolization of a given address. 113 114 This method is not guaranteed to return immediately. It generally does, but 115 in some scenarios (e.g. all addr2line instances have full queues) it can 116 block to create back-pressure. 117 118 Args: 119 addr: address to symbolize. 120 callback_arg: optional argument which will be passed to the |callback|.""" 121 assert(isinstance(addr, int)) 122 123 # Process all the symbols that have been resolved in the meanwhile. 124 # Essentially, this drains all the addr2line(s) out queues. 125 for a2l_to_purge in self._a2l_instances: 126 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() 127 a2l_to_purge.RecycleIfNecessary() 128 129 # Find the best instance according to this logic: 130 # 1. Find an existing instance with the shortest queue. 131 # 2. If all of instances' queues are full, but there is room in the pool, 132 # (i.e. < |max_concurrent_jobs|) create a new instance. 133 # 3. If there were already |max_concurrent_jobs| instances and all of them 134 # had full queues, make back-pressure. 135 136 # 1. 137 def _SortByQueueSizeAndReqID(a2l): 138 return (a2l.queue_size, a2l.first_request_id) 139 a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID) 140 141 # 2. 142 if (a2l.queue_size >= self.max_queue_size and 143 len(self._a2l_instances) < self.max_concurrent_jobs): 144 a2l = self._CreateNewA2LInstance() 145 146 # 3. 147 if a2l.queue_size >= self.max_queue_size: 148 a2l.WaitForNextSymbolInQueue() 149 150 a2l.EnqueueRequest(addr, callback_arg) 151 152 def Join(self): 153 """Waits for all the outstanding requests to complete and terminates.""" 154 for a2l in self._a2l_instances: 155 a2l.WaitForIdle() 156 a2l.Terminate() 157 158 def _CreateNewA2LInstance(self): 159 assert(len(self._a2l_instances) < self.max_concurrent_jobs) 160 a2l = ELFSymbolizer.Addr2Line(self) 161 self._a2l_instances.append(a2l) 162 return a2l 163 164 165 class Addr2Line(object): 166 """A python wrapper around an addr2line instance. 167 168 The communication with the addr2line process looks as follows: 169 [STDIN] [STDOUT] (from addr2line's viewpoint) 170 > f001111 171 > f002222 172 < Symbol::Name(foo, bar) for f001111 173 < /path/to/source/file.c:line_number 174 > f003333 175 < Symbol::Name2() for f002222 176 < /path/to/source/file.c:line_number 177 < Symbol::Name3() for f003333 178 < /path/to/source/file.c:line_number 179 """ 180 181 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') 182 183 def __init__(self, symbolizer): 184 self._symbolizer = symbolizer 185 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) 186 187 # The request queue (i.e. addresses pushed to addr2line's stdin and not 188 # yet retrieved on stdout) 189 self._request_queue = collections.deque() 190 191 # This is essentially len(self._request_queue). It has been optimized to a 192 # separate field because turned out to be a perf hot-spot. 193 self.queue_size = 0 194 195 # Keep track of the number of symbols a process has processed to 196 # avoid a single process growing too big and using all the memory. 197 self._processed_symbols_count = 0 198 199 # Objects required to handle the addr2line subprocess. 200 self._proc = None # Subprocess.Popen(...) instance. 201 self._thread = None # Threading.thread instance. 202 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). 203 self._RestartAddr2LineProcess() 204 205 def EnqueueRequest(self, addr, callback_arg): 206 """Pushes an address to addr2line's stdin (and keeps track of it).""" 207 self._symbolizer.requests_counter += 1 # For global "age" of requests. 208 req_idx = self._symbolizer.requests_counter 209 self._request_queue.append((addr, callback_arg, req_idx)) 210 self.queue_size += 1 211 self._WriteToA2lStdin(addr) 212 213 def WaitForIdle(self): 214 """Waits until all the pending requests have been symbolized.""" 215 while self.queue_size > 0: 216 self.WaitForNextSymbolInQueue() 217 218 def WaitForNextSymbolInQueue(self): 219 """Waits for the next pending request to be symbolized.""" 220 if not self.queue_size: 221 return 222 223 # This outer loop guards against a2l hanging (detecting stdout timeout). 224 while True: 225 start_time = datetime.datetime.now() 226 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) 227 228 # The inner loop guards against a2l crashing (checking if it exited). 229 while (datetime.datetime.now() - start_time < timeout): 230 # poll() returns !None if the process exited. a2l should never exit. 231 if self._proc.poll(): 232 logging.warning('addr2line crashed, respawning (lib: %s).' % 233 self._lib_file_name) 234 self._RestartAddr2LineProcess() 235 # TODO(primiano): the best thing to do in this case would be 236 # shrinking the pool size as, very likely, addr2line is crashed 237 # due to low memory (and the respawned one will die again soon). 238 239 try: 240 lines = self._out_queue.get(block=True, timeout=0.25) 241 except Queue.Empty: 242 # On timeout (1/4 s.) repeat the inner loop and check if either the 243 # addr2line process did crash or we waited its output for too long. 244 continue 245 246 # In nominal conditions, we get straight to this point. 247 self._ProcessSymbolOutput(lines) 248 return 249 250 # If this point is reached, we waited more than |addr2line_timeout|. 251 logging.warning('Hung addr2line process, respawning (lib: %s).' % 252 self._lib_file_name) 253 self._RestartAddr2LineProcess() 254 255 def ProcessAllResolvedSymbolsInQueue(self): 256 """Consumes all the addr2line output lines produced (without blocking).""" 257 if not self.queue_size: 258 return 259 while True: 260 try: 261 lines = self._out_queue.get_nowait() 262 except Queue.Empty: 263 break 264 self._ProcessSymbolOutput(lines) 265 266 def RecycleIfNecessary(self): 267 """Restarts the process if it has been used for too long. 268 269 A long running addr2line process will consume excessive amounts 270 of memory without any gain in performance.""" 271 if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT: 272 self._RestartAddr2LineProcess() 273 274 275 def Terminate(self): 276 """Kills the underlying addr2line process. 277 278 The poller |_thread| will terminate as well due to the broken pipe.""" 279 try: 280 self._proc.kill() 281 self._proc.communicate() # Essentially wait() without risking deadlock. 282 except Exception: # An exception while terminating? How interesting. 283 pass 284 self._proc = None 285 286 def _WriteToA2lStdin(self, addr): 287 self._proc.stdin.write('%s\n' % hex(addr)) 288 if self._symbolizer.inlines: 289 # In the case of inlines we output an extra blank line, which causes 290 # addr2line to emit a (??,??:0) tuple that we use as a boundary marker. 291 self._proc.stdin.write('\n') 292 self._proc.stdin.flush() 293 294 def _ProcessSymbolOutput(self, lines): 295 """Parses an addr2line symbol output and triggers the client callback.""" 296 (_, callback_arg, _) = self._request_queue.popleft() 297 self.queue_size -= 1 298 299 innermost_sym_info = None 300 sym_info = None 301 for (line1, line2) in lines: 302 prev_sym_info = sym_info 303 name = line1 if not line1.startswith('?') else None 304 source_path = None 305 source_line = None 306 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2) 307 if m: 308 if not m.group(1).startswith('?'): 309 source_path = m.group(1) 310 if not m.group(2).startswith('?'): 311 source_line = int(m.group(2)) 312 else: 313 logging.warning('Got invalid symbol path from addr2line: %s' % line2) 314 315 sym_info = ELFSymbolInfo(name, source_path, source_line) 316 if prev_sym_info: 317 prev_sym_info.inlined_by = sym_info 318 if not innermost_sym_info: 319 innermost_sym_info = sym_info 320 321 self._processed_symbols_count += 1 322 self._symbolizer.callback(innermost_sym_info, callback_arg) 323 324 def _RestartAddr2LineProcess(self): 325 if self._proc: 326 self.Terminate() 327 328 # The only reason of existence of this Queue (and the corresponding 329 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). 330 # Essentially this is a pipe able to extract a couple of lines atomically. 331 self._out_queue = Queue.Queue() 332 333 # Start the underlying addr2line process in line buffered mode. 334 335 cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle', 336 '--exe=' + self._symbolizer.elf_file_path] 337 if self._symbolizer.inlines: 338 cmd += ['--inlines'] 339 self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, 340 stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) 341 342 # Start the poller thread, which simply moves atomically the lines read 343 # from the addr2line's stdout to the |_out_queue|. 344 self._thread = threading.Thread( 345 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, 346 args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines)) 347 self._thread.daemon = True # Don't prevent early process exit. 348 self._thread.start() 349 350 self._processed_symbols_count = 0 351 352 # Replay the pending requests on the new process (only for the case 353 # of a hung addr2line timing out during the game). 354 for (addr, _, _) in self._request_queue: 355 self._WriteToA2lStdin(addr) 356 357 @staticmethod 358 def StdoutReaderThread(process_pipe, queue, inlines): 359 """The poller thread fn, which moves the addr2line stdout to the |queue|. 360 361 This is the only piece of code not running on the main thread. It merely 362 writes to a Queue, which is thread-safe. In the case of inlines, it 363 detects the ??,??:0 marker and sends the lines atomically, such that the 364 main thread always receives all the lines corresponding to one symbol in 365 one shot.""" 366 try: 367 lines_for_one_symbol = [] 368 while True: 369 line1 = process_pipe.readline().rstrip('\r\n') 370 line2 = process_pipe.readline().rstrip('\r\n') 371 if not line1 or not line2: 372 break 373 inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or 374 (line1 != '??' and line2 != '??:0')) 375 if not inlines or inline_has_more_lines: 376 lines_for_one_symbol += [(line1, line2)] 377 if inline_has_more_lines: 378 continue 379 queue.put(lines_for_one_symbol) 380 lines_for_one_symbol = [] 381 process_pipe.close() 382 383 # Every addr2line processes will die at some point, please die silently. 384 except (IOError, OSError): 385 pass 386 387 @property 388 def first_request_id(self): 389 """Returns the request_id of the oldest pending request in the queue.""" 390 return self._request_queue[0][2] if self._request_queue else 0 391 392 393 class ELFSymbolInfo(object): 394 """The result of the symbolization passed as first arg. of each callback.""" 395 396 def __init__(self, name, source_path, source_line): 397 """All the fields here can be None (if addr2line replies with '??').""" 398 self.name = name 399 self.source_path = source_path 400 self.source_line = source_line 401 # In the case of |inlines|=True, the |inlined_by| points to the outer 402 # function inlining the current one (and so on, to form a chain). 403 self.inlined_by = None 404 405 def __str__(self): 406 return '%s [%s:%d]' % ( 407 self.name or '??', self.source_path or '??', self.source_line or 0) 408