Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging, os, re, utils
      6 
      7 from autotest_lib.client.bin import utils
      8 from autotest_lib.client.common_lib import error
      9 
     10 class KernelTrace(object):
     11     """Allows access and control to Kernel tracing facilities.
     12 
     13     Example code snippet:
     14         trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock'])
     15         results = trace.read(regexp=r'frequency=(\d+)')
     16 
     17     Public methods:
     18         on          : Enables tracing
     19         off         : Disables tracing
     20         is_tracing  : Returns Boolean of tracing status.
     21         event_on    : Turns event on.  Returns boolean of success
     22         event_off   : Turns event off.  Returns boolean of success
     23         flush       : Flushes trace buffer
     24         read        : Reads trace buffer returns list of
     25                       - tuples if regexp provided
     26                       - else matching string
     27         uptime_secs : Returns float of current uptime.
     28 
     29     Private functions:
     30         _onoff       : Disable/enable tracing
     31         _onoff_event : Disable/enable events
     32 
     33     Private attributes:
     34         _buffer      : list to hold parsed results from trace buffer
     35         _buffer_ptr  : integer pointing to last byte read
     36 
     37     TODO(tbroch):  List of potential enhancements
     38        - currently only supports trace events.  Add other tracers.
     39     """
     40     _TRACE_ROOT = '/sys/kernel/debug/tracing'
     41     _TRACE_EN_PATH = os.path.join(_TRACE_ROOT, 'tracing_enabled')
     42 
     43     def __init__(self, flush=True, events=None, on=True):
     44         """Constructor for KernelTrace class"""
     45         self._buffer = []
     46         self._buffer_ptr = 0
     47         self._events = []
     48         self._on = on
     49 
     50         if flush:
     51             self.flush()
     52         for event in events:
     53             if self.event_on(event):
     54                 self._events.append(event)
     55         if on:
     56             self.on()
     57 
     58 
     59     def __del__(self, flush=True, events=None, on=True):
     60         """Deconstructor for KernelTrace class"""
     61         for event in self._events:
     62             self.event_off(event)
     63         if self._on:
     64             self.off()
     65 
     66 
     67     def _onoff(self, val):
     68         """Enable/Disable tracing.
     69 
     70         Arguments:
     71             val: integer, 1 for on, 0 for off
     72 
     73         Raises:
     74             error.TestFail: If unable to enable/disable tracing
     75               boolean of tracing on/off status
     76         """
     77         utils.write_one_line(self._TRACE_EN_PATH, val)
     78         fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
     79         result = int(utils.read_one_line(fname).strip())
     80         if not result == val:
     81             raise error.TestFail("Unable to %sable tracing" %
     82                                  'en' if val == 1 else 'dis')
     83 
     84 
     85     def on(self):
     86         """Enable tracing."""
     87         return self._onoff(1)
     88 
     89 
     90     def off(self):
     91         """Disable tracing."""
     92         self._onoff(0)
     93 
     94 
     95     def is_tracing(self):
     96         """Is tracing on?
     97 
     98         Returns:
     99             True if tracing enabled and at least one event is enabled.
    100         """
    101         fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
    102         result = int(utils.read_one_line(fname).strip())
    103         if result == 1 and len(self._events) > 0:
    104             return True
    105         return False
    106 
    107 
    108     def _event_onoff(self, event, val):
    109         """Enable/Disable tracing event.
    110 
    111         TODO(tbroch) Consider allowing wild card enabling of trace events via
    112             /sys/kernel/debug/tracing/set_event although it makes filling buffer
    113             really easy
    114 
    115         Arguments:
    116             event: list of events.
    117                    See kernel(Documentation/trace/events.txt) for formatting.
    118             val: integer, 1 for on, 0 for off
    119 
    120          Returns:
    121             True if success, false otherwise
    122         """
    123         logging.debug("event_onoff: event:%s val:%d", event, val)
    124         event_path = event.replace(':', '/')
    125         fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable')
    126 
    127         if not os.path.exists(fname):
    128             logging.warning("Unable to locate tracing event %s", fname)
    129             return False
    130         utils.write_one_line(fname, val)
    131 
    132         fname = os.path.join(self._TRACE_ROOT, "set_event")
    133         found = False
    134         with open(fname) as fd:
    135             for ln in fd.readlines():
    136                 logging.debug("set_event ln:%s", ln)
    137                 if re.findall(event, ln):
    138                     found = True
    139                     break
    140 
    141         if val == 1 and not found:
    142             logging.warning("Event %s not enabled", event)
    143             return False
    144 
    145         if val == 0 and found:
    146             logging.warning("Event %s not disabled", event)
    147             return False
    148 
    149         return True
    150 
    151 
    152     def event_on(self, event):
    153         return self._event_onoff(event, 1)
    154 
    155 
    156     def event_off(self, event):
    157         return self._event_onoff(event, 0)
    158 
    159 
    160     def flush(self):
    161         """Flush trace buffer.
    162 
    163         Raises:
    164             error.TestFail: If unable to flush
    165         """
    166         self.off()
    167         fname = os.path.join(self._TRACE_ROOT, 'free_buffer')
    168         utils.write_one_line(fname, 1)
    169         self._buffer_ptr = 0
    170 
    171         fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb')
    172         result = utils.read_one_line(fname).strip()
    173         if result is '0':
    174             return True
    175         return False
    176 
    177 
    178     def read(self, regexp=None):
    179         fname = os.path.join(self._TRACE_ROOT, 'trace')
    180         fd = open(fname)
    181         fd.seek(self._buffer_ptr)
    182         for ln in fd.readlines():
    183             if regexp is None:
    184                 self._buffer.append(ln)
    185                 continue
    186             results = re.findall(regexp, ln)
    187             if results:
    188                 logging.debug(ln)
    189                 self._buffer.append(results[0])
    190         self._buffer_ptr = fd.tell()
    191         fd.close()
    192         return self._buffer
    193 
    194 
    195     @staticmethod
    196     def uptime_secs():
    197         results = utils.read_one_line("/proc/uptime")
    198         return float(results.split()[0])
    199