Home | History | Annotate | Download | only in symbols
      1 # Copyright 2014 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import collections
      6 import datetime
      7 import logging
      8 import multiprocessing
      9 import os
     10 import posixpath
     11 import Queue
     12 import re
     13 import subprocess
     14 import sys
     15 import threading
     16 
     17 
     18 # addr2line builds a possibly infinite memory cache that can exhaust
     19 # the computer's memory if allowed to grow for too long. This constant
     20 # controls how many lookups we do before restarting the process. 4000
     21 # gives near peak performance without extreme memory usage.
     22 ADDR2LINE_RECYCLE_LIMIT = 4000
     23 
     24 
     25 class ELFSymbolizer(object):
     26   """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer.
     27 
     28   This class is a frontend for addr2line (part of GNU binutils), designed to
     29   symbolize batches of large numbers of symbols for a given ELF file. It
     30   supports sharding symbolization against many addr2line instances and
     31   pipelining of multiple requests per each instance (in order to hide addr2line
     32   internals and OS pipe latencies).
     33 
     34   The interface exhibited by this class is a very simple asynchronous interface,
     35   which is based on the following three methods:
     36   - SymbolizeAsync(): used to request (enqueue) resolution of a given address.
     37   - The |callback| method: used to communicated back the symbol information.
     38   - Join(): called to conclude the batch to gather the last outstanding results.
     39   In essence, before the Join method returns, this class will have issued as
     40   many callbacks as the number of SymbolizeAsync() calls. In this regard, note
     41   that due to multiprocess sharding, callbacks can be delivered out of order.
     42 
     43   Some background about addr2line:
     44   - it is invoked passing the elf path in the cmdline, piping the addresses in
     45     its stdin and getting results on its stdout.
     46   - it has pretty large response times for the first requests, but it
     47     works very well in streaming mode once it has been warmed up.
     48   - it doesn't scale by itself (on more cores). However, spawning multiple
     49     instances at the same time on the same file is pretty efficient as they
     50     keep hitting the pagecache and become mostly CPU bound.
     51   - it might hang or crash, mostly for OOM. This class deals with both of these
     52     problems.
     53 
     54   Despite the "scary" imports and the multi* words above, (almost) no multi-
     55   threading/processing is involved from the python viewpoint. Concurrency
     56   here is achieved by spawning several addr2line subprocesses and handling their
     57   output pipes asynchronously. Therefore, all the code here (with the exception
     58   of the Queue instance in Addr2Line) should be free from mind-blowing
     59   thread-safety concerns.
     60 
     61   The multiprocess sharding works as follows:
     62   The symbolizer tries to use the lowest number of addr2line instances as
     63   possible (with respect of |max_concurrent_jobs|) and enqueue all the requests
     64   in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't
     65   worth the startup cost.
     66   The multiprocess logic kicks in as soon as the queues for the existing
     67   instances grow. Specifically, once all the existing instances reach the
     68   |max_queue_size| bound, a new addr2line instance is kicked in.
     69   In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances
     70   have a backlog of |max_queue_size|), back-pressure is applied on the caller by
     71   blocking the SymbolizeAsync method.
     72 
     73   This module has been deliberately designed to be dependency free (w.r.t. of
     74   other modules in this project), to allow easy reuse in external projects.
     75   """
     76 
     77   def __init__(self, elf_file_path, addr2line_path, callback, inlines=False,
     78       max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50):
     79     """Args:
     80       elf_file_path: path of the elf file to be symbolized.
     81       addr2line_path: path of the toolchain's addr2line binary.
     82       callback: a callback which will be invoked for each resolved symbol with
     83           the two args (sym_info, callback_arg). The former is an instance of
     84           |ELFSymbolInfo| and contains the symbol information. The latter is an
     85           embedder-provided argument which is passed to SymbolizeAsync().
     86       inlines: when True, the ELFSymbolInfo will contain also the details about
     87           the outer inlining functions. When False, only the innermost function
     88           will be provided.
     89       max_concurrent_jobs: Max number of addr2line instances spawned.
     90           Parallelize responsibly, addr2line is a memory and I/O monster.
     91       max_queue_size: Max number of outstanding requests per addr2line instance.
     92       addr2line_timeout: Max time (in seconds) to wait for a addr2line response.
     93           After the timeout, the instance will be considered hung and respawned.
     94     """
     95     assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path
     96     self.elf_file_path = elf_file_path
     97     self.addr2line_path = addr2line_path
     98     self.callback = callback
     99     self.inlines = inlines
    100     self.max_concurrent_jobs = (max_concurrent_jobs or
    101                                 min(multiprocessing.cpu_count(), 4))
    102     self.max_queue_size = max_queue_size
    103     self.addr2line_timeout = addr2line_timeout
    104     self.requests_counter = 0  # For generating monotonic request IDs.
    105     self._a2l_instances = []  # Up to |max_concurrent_jobs| _Addr2Line inst.
    106 
    107     # Create one addr2line instance. More instances will be created on demand
    108     # (up to |max_concurrent_jobs|) depending on the rate of the requests.
    109     self._CreateNewA2LInstance()
    110 
    111   def SymbolizeAsync(self, addr, callback_arg=None):
    112     """Requests symbolization of a given address.
    113 
    114     This method is not guaranteed to return immediately. It generally does, but
    115     in some scenarios (e.g. all addr2line instances have full queues) it can
    116     block to create back-pressure.
    117 
    118     Args:
    119       addr: address to symbolize.
    120       callback_arg: optional argument which will be passed to the |callback|."""
    121     assert(isinstance(addr, int))
    122 
    123     # Process all the symbols that have been resolved in the meanwhile.
    124     # Essentially, this drains all the addr2line(s) out queues.
    125     for a2l_to_purge in self._a2l_instances:
    126       a2l_to_purge.ProcessAllResolvedSymbolsInQueue()
    127       a2l_to_purge.RecycleIfNecessary()
    128 
    129     # Find the best instance according to this logic:
    130     # 1. Find an existing instance with the shortest queue.
    131     # 2. If all of instances' queues are full, but there is room in the pool,
    132     #    (i.e. < |max_concurrent_jobs|) create a new instance.
    133     # 3. If there were already |max_concurrent_jobs| instances and all of them
    134     #    had full queues, make back-pressure.
    135 
    136     # 1.
    137     def _SortByQueueSizeAndReqID(a2l):
    138       return (a2l.queue_size, a2l.first_request_id)
    139     a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID)
    140 
    141     # 2.
    142     if (a2l.queue_size >= self.max_queue_size and
    143         len(self._a2l_instances) < self.max_concurrent_jobs):
    144       a2l = self._CreateNewA2LInstance()
    145 
    146     # 3.
    147     if a2l.queue_size >= self.max_queue_size:
    148       a2l.WaitForNextSymbolInQueue()
    149 
    150     a2l.EnqueueRequest(addr, callback_arg)
    151 
    152   def Join(self):
    153     """Waits for all the outstanding requests to complete and terminates."""
    154     for a2l in self._a2l_instances:
    155       a2l.WaitForIdle()
    156       a2l.Terminate()
    157 
    158   def _CreateNewA2LInstance(self):
    159     assert(len(self._a2l_instances) < self.max_concurrent_jobs)
    160     a2l = ELFSymbolizer.Addr2Line(self)
    161     self._a2l_instances.append(a2l)
    162     return a2l
    163 
    164 
    165   class Addr2Line(object):
    166     """A python wrapper around an addr2line instance.
    167 
    168     The communication with the addr2line process looks as follows:
    169       [STDIN]         [STDOUT]  (from addr2line's viewpoint)
    170     > f001111
    171     > f002222
    172                     < Symbol::Name(foo, bar) for f001111
    173                     < /path/to/source/file.c:line_number
    174     > f003333
    175                     < Symbol::Name2() for f002222
    176                     < /path/to/source/file.c:line_number
    177                     < Symbol::Name3() for f003333
    178                     < /path/to/source/file.c:line_number
    179     """
    180 
    181     SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*')
    182 
    183     def __init__(self, symbolizer):
    184       self._symbolizer = symbolizer
    185       self._lib_file_name = posixpath.basename(symbolizer.elf_file_path)
    186 
    187       # The request queue (i.e. addresses pushed to addr2line's stdin and not
    188       # yet retrieved on stdout)
    189       self._request_queue = collections.deque()
    190 
    191       # This is essentially len(self._request_queue). It has been optimized to a
    192       # separate field because turned out to be a perf hot-spot.
    193       self.queue_size = 0
    194 
    195       # Keep track of the number of symbols a process has processed to
    196       # avoid a single process growing too big and using all the memory.
    197       self._processed_symbols_count = 0
    198 
    199       # Objects required to handle the addr2line subprocess.
    200       self._proc = None  # Subprocess.Popen(...) instance.
    201       self._thread = None  # Threading.thread instance.
    202       self._out_queue = None  # Queue.Queue instance (for buffering a2l stdout).
    203       self._RestartAddr2LineProcess()
    204 
    205     def EnqueueRequest(self, addr, callback_arg):
    206       """Pushes an address to addr2line's stdin (and keeps track of it)."""
    207       self._symbolizer.requests_counter += 1  # For global "age" of requests.
    208       req_idx = self._symbolizer.requests_counter
    209       self._request_queue.append((addr, callback_arg, req_idx))
    210       self.queue_size += 1
    211       self._WriteToA2lStdin(addr)
    212 
    213     def WaitForIdle(self):
    214       """Waits until all the pending requests have been symbolized."""
    215       while self.queue_size > 0:
    216         self.WaitForNextSymbolInQueue()
    217 
    218     def WaitForNextSymbolInQueue(self):
    219       """Waits for the next pending request to be symbolized."""
    220       if not self.queue_size:
    221         return
    222 
    223       # This outer loop guards against a2l hanging (detecting stdout timeout).
    224       while True:
    225         start_time = datetime.datetime.now()
    226         timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout)
    227 
    228         # The inner loop guards against a2l crashing (checking if it exited).
    229         while (datetime.datetime.now() - start_time < timeout):
    230           # poll() returns !None if the process exited. a2l should never exit.
    231           if self._proc.poll():
    232             logging.warning('addr2line crashed, respawning (lib: %s).' %
    233                             self._lib_file_name)
    234             self._RestartAddr2LineProcess()
    235             # TODO(primiano): the best thing to do in this case would be
    236             # shrinking the pool size as, very likely, addr2line is crashed
    237             # due to low memory (and the respawned one will die again soon).
    238 
    239           try:
    240             lines = self._out_queue.get(block=True, timeout=0.25)
    241           except Queue.Empty:
    242             # On timeout (1/4 s.) repeat the inner loop and check if either the
    243             # addr2line process did crash or we waited its output for too long.
    244             continue
    245 
    246           # In nominal conditions, we get straight to this point.
    247           self._ProcessSymbolOutput(lines)
    248           return
    249 
    250         # If this point is reached, we waited more than |addr2line_timeout|.
    251         logging.warning('Hung addr2line process, respawning (lib: %s).' %
    252                         self._lib_file_name)
    253         self._RestartAddr2LineProcess()
    254 
    255     def ProcessAllResolvedSymbolsInQueue(self):
    256       """Consumes all the addr2line output lines produced (without blocking)."""
    257       if not self.queue_size:
    258         return
    259       while True:
    260         try:
    261           lines = self._out_queue.get_nowait()
    262         except Queue.Empty:
    263           break
    264         self._ProcessSymbolOutput(lines)
    265 
    266     def RecycleIfNecessary(self):
    267       """Restarts the process if it has been used for too long.
    268 
    269       A long running addr2line process will consume excessive amounts
    270       of memory without any gain in performance."""
    271       if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT:
    272         self._RestartAddr2LineProcess()
    273 
    274 
    275     def Terminate(self):
    276       """Kills the underlying addr2line process.
    277 
    278       The poller |_thread| will terminate as well due to the broken pipe."""
    279       try:
    280         self._proc.kill()
    281         self._proc.communicate()  # Essentially wait() without risking deadlock.
    282       except Exception:  # An exception while terminating? How interesting.
    283         pass
    284       self._proc = None
    285 
    286     def _WriteToA2lStdin(self, addr):
    287       self._proc.stdin.write('%s\n' % hex(addr))
    288       if self._symbolizer.inlines:
    289         # In the case of inlines we output an extra blank line, which causes
    290         # addr2line to emit a (??,??:0) tuple that we use as a boundary marker.
    291         self._proc.stdin.write('\n')
    292       self._proc.stdin.flush()
    293 
    294     def _ProcessSymbolOutput(self, lines):
    295       """Parses an addr2line symbol output and triggers the client callback."""
    296       (_, callback_arg, _) = self._request_queue.popleft()
    297       self.queue_size -= 1
    298 
    299       innermost_sym_info = None
    300       sym_info = None
    301       for (line1, line2) in lines:
    302         prev_sym_info = sym_info
    303         name = line1 if not line1.startswith('?') else None
    304         source_path = None
    305         source_line = None
    306         m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2)
    307         if m:
    308           if not m.group(1).startswith('?'):
    309             source_path = m.group(1)
    310             if not m.group(2).startswith('?'):
    311               source_line = int(m.group(2))
    312         else:
    313           logging.warning('Got invalid symbol path from addr2line: %s' % line2)
    314 
    315         sym_info = ELFSymbolInfo(name, source_path, source_line)
    316         if prev_sym_info:
    317           prev_sym_info.inlined_by = sym_info
    318         if not innermost_sym_info:
    319           innermost_sym_info = sym_info
    320 
    321       self._processed_symbols_count += 1
    322       self._symbolizer.callback(innermost_sym_info, callback_arg)
    323 
    324     def _RestartAddr2LineProcess(self):
    325       if self._proc:
    326         self.Terminate()
    327 
    328       # The only reason of existence of this Queue (and the corresponding
    329       # Thread below) is the lack of a subprocess.stdout.poll_avail_lines().
    330       # Essentially this is a pipe able to extract a couple of lines atomically.
    331       self._out_queue = Queue.Queue()
    332 
    333       # Start the underlying addr2line process in line buffered mode.
    334 
    335       cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle',
    336           '--exe=' + self._symbolizer.elf_file_path]
    337       if self._symbolizer.inlines:
    338         cmd += ['--inlines']
    339       self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE,
    340           stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True)
    341 
    342       # Start the poller thread, which simply moves atomically the lines read
    343       # from the addr2line's stdout to the |_out_queue|.
    344       self._thread = threading.Thread(
    345           target=ELFSymbolizer.Addr2Line.StdoutReaderThread,
    346           args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines))
    347       self._thread.daemon = True  # Don't prevent early process exit.
    348       self._thread.start()
    349 
    350       self._processed_symbols_count = 0
    351 
    352       # Replay the pending requests on the new process (only for the case
    353       # of a hung addr2line timing out during the game).
    354       for (addr, _, _) in self._request_queue:
    355         self._WriteToA2lStdin(addr)
    356 
    357     @staticmethod
    358     def StdoutReaderThread(process_pipe, queue, inlines):
    359       """The poller thread fn, which moves the addr2line stdout to the |queue|.
    360 
    361       This is the only piece of code not running on the main thread. It merely
    362       writes to a Queue, which is thread-safe. In the case of inlines, it
    363       detects the ??,??:0 marker and sends the lines atomically, such that the
    364       main thread always receives all the lines corresponding to one symbol in
    365       one shot."""
    366       try:
    367         lines_for_one_symbol = []
    368         while True:
    369           line1 = process_pipe.readline().rstrip('\r\n')
    370           line2 = process_pipe.readline().rstrip('\r\n')
    371           if not line1 or not line2:
    372             break
    373           inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or
    374                                   (line1 != '??' and line2 != '??:0'))
    375           if not inlines or inline_has_more_lines:
    376             lines_for_one_symbol += [(line1, line2)]
    377           if inline_has_more_lines:
    378             continue
    379           queue.put(lines_for_one_symbol)
    380           lines_for_one_symbol = []
    381         process_pipe.close()
    382 
    383       # Every addr2line processes will die at some point, please die silently.
    384       except (IOError, OSError):
    385         pass
    386 
    387     @property
    388     def first_request_id(self):
    389       """Returns the request_id of the oldest pending request in the queue."""
    390       return self._request_queue[0][2] if self._request_queue else 0
    391 
    392 
    393 class ELFSymbolInfo(object):
    394   """The result of the symbolization passed as first arg. of each callback."""
    395 
    396   def __init__(self, name, source_path, source_line):
    397     """All the fields here can be None (if addr2line replies with '??')."""
    398     self.name = name
    399     self.source_path = source_path
    400     self.source_line = source_line
    401     # In the case of |inlines|=True, the |inlined_by| points to the outer
    402     # function inlining the current one (and so on, to form a chain).
    403     self.inlined_by = None
    404 
    405   def __str__(self):
    406     return '%s [%s:%d]' % (
    407         self.name or '??', self.source_path or '??', self.source_line or 0)
    408