Home | History | Annotate | Download | only in utils
      1 # SPDX-License-Identifier: Apache-2.0
      2 #
      3 # Copyright (C) 2015, ARM Limited and contributors.
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
      6 # not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 # http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 #
     17 
     18 """ Trace Parser Module """
     19 
     20 import numpy as np
     21 import os
     22 import pandas as pd
     23 import sys
     24 import trappy
     25 import json
     26 import warnings
     27 import operator
     28 import logging
     29 
     30 from analysis_register import AnalysisRegister
     31 from collections import namedtuple
     32 from devlib.utils.misc import memoized
     33 from trappy.utils import listify, handle_duplicate_index
     34 
     35 
     36 NON_IDLE_STATE = -1
     37 ResidencyTime = namedtuple('ResidencyTime', ['total', 'active'])
     38 ResidencyData = namedtuple('ResidencyData', ['label', 'residency'])
     39 
     40 class Trace(object):
     41     """
     42     The Trace object is the LISA trace events parser.
     43 
     44     :param platform: a dictionary containing information about the target
     45         platform
     46     :type platform: dict
     47 
     48     :param data_dir: folder containing all trace data
     49     :type data_dir: str
     50 
     51     :param events: events to be parsed (everything in the trace by default)
     52     :type events: list(str)
     53 
     54     :param tasks: filter data for the specified tasks only. If None (default),
     55         use data for all tasks found in the trace.
     56     :type tasks: list(str) or NoneType
     57 
     58     :param window: time window to consider when parsing the trace
     59     :type window: tuple(int, int)
     60 
     61     :param normalize_time: normalize trace time stamps
     62     :type normalize_time: bool
     63 
     64     :param trace_format: format of the trace. Possible values are:
     65         - FTrace
     66         - SysTrace
     67     :type trace_format: str
     68 
     69     :param plots_dir: directory where to save plots
     70     :type plots_dir: str
     71 
     72     :param plots_prefix: prefix for plots file names
     73     :type plots_prefix: str
     74 
     75     :param cgroup_info: add cgroup information for sanitization
     76       example:
     77         {
     78             'controller_ids': { 2: 'schedtune', 4: 'cpuset' },
     79             'cgroups': [ 'root', 'background', 'foreground' ],  # list of allowed cgroup names
     80         }
     81     :type cgroup_info: dict
     82     """
     83 
     84     def __init__(self, platform, data_dir, events=None,
     85                  tasks=None, window=(0, None),
     86                  normalize_time=True,
     87                  trace_format='FTrace',
     88                  plots_dir=None,
     89                  plots_prefix='',
     90                  cgroup_info={}):
     91 
     92         # The platform used to run the experiments
     93         self.platform = platform
     94 
     95         # TRAPpy Trace object
     96         self.ftrace = None
     97 
     98         # Trace format
     99         self.trace_format = trace_format
    100 
    101         # The time window used to limit trace parsing to
    102         self.window = window
    103 
    104         # Dynamically registered TRAPpy events
    105         self.trappy_cls = {}
    106 
    107         # Maximum timespan for all collected events
    108         self.time_range = 0
    109 
    110         # Time the system was overutilzied
    111         self.overutilized_time = 0
    112         self.overutilized_prc = 0
    113 
    114         # The dictionary of tasks descriptors available in the dataset
    115         self.tasks = {}
    116 
    117         # List of events required by user
    118         self.events = []
    119 
    120         # List of events available in the parsed trace
    121         self.available_events = []
    122 
    123         # Cluster frequency coherency flag
    124         self.freq_coherency = True
    125 
    126         # Folder containing all trace data
    127         self.data_dir = None
    128 
    129         # Setup logging
    130         self._log = logging.getLogger('Trace')
    131 
    132         # Folder containing trace
    133         if not os.path.isdir(data_dir):
    134             self.data_dir = os.path.dirname(data_dir)
    135         else:
    136             self.data_dir = data_dir
    137 
    138         # By deafult, use the trace dir to save plots
    139         self.plots_dir = plots_dir
    140         if self.plots_dir is None:
    141             self.plots_dir = self.data_dir
    142         self.plots_prefix = plots_prefix
    143 
    144         # Cgroup info for sanitization
    145         self.cgroup_info = cgroup_info
    146 
    147         self.__registerTraceEvents(events) if events else None
    148         self.__parseTrace(data_dir, tasks, window, normalize_time,
    149                           trace_format)
    150         self.__computeTimeSpan()
    151 
    152         # Minimum and Maximum x_time to use for all plots
    153         self.x_min = 0
    154         self.x_max = self.time_range
    155 
    156         # Reset x axis time range to full scale
    157         t_min = self.window[0]
    158         t_max = self.window[1]
    159         self.setXTimeRange(t_min, t_max)
    160 
    161         self.data_frame = TraceData()
    162         self._registerDataFrameGetters(self)
    163 
    164         self.analysis = AnalysisRegister(self)
    165 
    166     def _registerDataFrameGetters(self, module):
    167         """
    168         Internal utility function that looks up getter functions with a "_dfg_"
    169         prefix in their name and bounds them to the specified module.
    170 
    171         :param module: module to which the function is added
    172         :type module: class
    173         """
    174         self._log.debug('Registering [%s] local data frames', module)
    175         for func in dir(module):
    176             if not func.startswith('_dfg_'):
    177                 continue
    178             dfg_name = func.replace('_dfg_', '')
    179             dfg_func = getattr(module, func)
    180             self._log.debug('   %s', dfg_name)
    181             setattr(self.data_frame, dfg_name, dfg_func)
    182 
    183     def setXTimeRange(self, t_min=None, t_max=None):
    184         """
    185         Set x axis time range to the specified values.
    186 
    187         :param t_min: lower bound
    188         :type t_min: int or float
    189 
    190         :param t_max: upper bound
    191         :type t_max: int or float
    192         """
    193         if t_min is None:
    194             self.x_min = 0
    195         else:
    196             self.x_min = t_min
    197         if t_max is None:
    198             self.x_max = self.time_range
    199         else:
    200             self.x_max = t_max
    201         self._log.debug('Set plots time range to (%.6f, %.6f)[s]',
    202                        self.x_min, self.x_max)
    203 
    204     def __registerTraceEvents(self, events):
    205         """
    206         Save a copy of the parsed events.
    207 
    208         :param events: single event name or list of events names
    209         :type events: str or list(str)
    210         """
    211         if isinstance(events, basestring):
    212             self.events = events.split(' ')
    213         elif isinstance(events, list):
    214             self.events = events
    215         else:
    216             raise ValueError('Events must be a string or a list of strings')
    217         # Register devlib fake cpu_frequency events
    218         if 'cpu_frequency' in events:
    219             self.events.append('cpu_frequency_devlib')
    220 
    221     def __parseTrace(self, path, tasks, window, normalize_time, trace_format):
    222         """
    223         Internal method in charge of performing the actual parsing of the
    224         trace.
    225 
    226         :param path: path to the trace folder (or trace file)
    227         :type path: str
    228 
    229         :param tasks: filter data for the specified tasks only
    230         :type tasks: list(str)
    231 
    232         :param window: time window to consider when parsing the trace
    233         :type window: tuple(int, int)
    234 
    235         :param normalize_time: normalize trace time stamps
    236         :type normalize_time: bool
    237 
    238         :param trace_format: format of the trace. Possible values are:
    239             - FTrace
    240             - SysTrace
    241         :type trace_format: str
    242         """
    243         self._log.debug('Loading [sched] events from trace in [%s]...', path)
    244         self._log.debug('Parsing events: %s', self.events if self.events else 'ALL')
    245         if trace_format.upper() == 'SYSTRACE' or path.endswith('html'):
    246             self._log.debug('Parsing SysTrace format...')
    247             trace_class = trappy.SysTrace
    248             self.trace_format = 'SysTrace'
    249         elif trace_format.upper() == 'FTRACE':
    250             self._log.debug('Parsing FTrace format...')
    251             trace_class = trappy.FTrace
    252             self.trace_format = 'FTrace'
    253         else:
    254             raise ValueError("Unknown trace format {}".format(trace_format))
    255 
    256         scope = 'custom' if self.events else 'all'
    257         self.ftrace = trace_class(path, scope=scope, events=self.events,
    258                                   window=window, normalize_time=normalize_time)
    259 
    260         # Load Functions profiling data
    261         has_function_stats = self._loadFunctionsStats(path)
    262 
    263         # Check for events available on the parsed trace
    264         self.__checkAvailableEvents()
    265         if len(self.available_events) == 0:
    266             if has_function_stats:
    267                 self._log.info('Trace contains only functions stats')
    268                 return
    269             raise ValueError('The trace does not contain useful events '
    270                              'nor function stats')
    271 
    272         # Sanitize cgroup info if any
    273         self._sanitize_CgroupAttachTask()
    274 
    275         # Santization not possible if platform missing
    276         if not self.platform:
    277             # Setup internal data reference to interesting events/dataframes
    278             self._sanitize_SchedLoadAvgCpu()
    279             self._sanitize_SchedLoadAvgTask()
    280             self._sanitize_SchedCpuCapacity()
    281             self._sanitize_SchedBoostCpu()
    282             self._sanitize_SchedBoostTask()
    283             self._sanitize_SchedEnergyDiff()
    284             self._sanitize_SchedOverutilized()
    285             self._sanitize_CpuFrequency()
    286 
    287         self.__loadTasksNames(tasks)
    288 
    289         # Compute plot window
    290         if not normalize_time:
    291             start = self.window[0]
    292             if self.window[1]:
    293                 duration = min(self.ftrace.get_duration(), self.window[1])
    294             else:
    295                 duration = self.ftrace.get_duration()
    296             self.window = (self.ftrace.basetime + start,
    297                            self.ftrace.basetime + duration)
    298 
    299     def __checkAvailableEvents(self, key=""):
    300         """
    301         Internal method used to build a list of available events.
    302 
    303         :param key: key to be used for TRAPpy filtering
    304         :type key: str
    305         """
    306         for val in self.ftrace.get_filters(key):
    307             obj = getattr(self.ftrace, val)
    308             if len(obj.data_frame):
    309                 self.available_events.append(val)
    310         self._log.debug('Events found on trace:')
    311         for evt in self.available_events:
    312             self._log.debug(' - %s', evt)
    313 
    314     def __loadTasksNames(self, tasks):
    315         """
    316         Try to load tasks names using one of the supported events.
    317 
    318         :param tasks: list of task names. If None, load all tasks found.
    319         :type tasks: list(str) or NoneType
    320         """
    321         def load(tasks, event, name_key, pid_key):
    322             df = self._dfg_trace_event(event)
    323             if tasks is None:
    324                 tasks = df[name_key].unique()
    325             self.getTasks(df, tasks, name_key=name_key, pid_key=pid_key)
    326             self._scanTasks(df, name_key=name_key, pid_key=pid_key)
    327             self._scanTgids(df)
    328 
    329         if 'sched_switch' in self.available_events:
    330             load(tasks, 'sched_switch', 'next_comm', 'next_pid')
    331         elif 'sched_load_avg_task' in self.available_events:
    332             load(tasks, 'sched_load_avg_task', 'comm', 'pid')
    333         else:
    334             self._log.warning('Failed to load tasks names from trace events')
    335 
    336     def hasEvents(self, dataset):
    337         """
    338         Returns True if the specified event is present in the parsed trace,
    339         False otherwise.
    340 
    341         :param dataset: trace event name or list of trace events
    342         :type dataset: str or list(str)
    343         """
    344         if dataset in self.available_events:
    345             return True
    346         return False
    347 
    348     def __computeTimeSpan(self):
    349         """
    350         Compute time axis range, considering all the parsed events.
    351         """
    352         ts = sys.maxint
    353         te = 0
    354 
    355         for events in self.available_events:
    356             df = self._dfg_trace_event(events)
    357             if len(df) == 0:
    358                 continue
    359             if (df.index[0]) < ts:
    360                 ts = df.index[0]
    361             if (df.index[-1]) > te:
    362                 te = df.index[-1]
    363             self.time_range = te - ts
    364 
    365         self._log.debug('Collected events spans a %.3f [s] time interval',
    366                        self.time_range)
    367 
    368         # Build a stat on trace overutilization
    369         if self.hasEvents('sched_overutilized'):
    370             df = self._dfg_trace_event('sched_overutilized')
    371             self.overutilized_time = df[df.overutilized == 1].len.sum()
    372             self.overutilized_prc = 100. * self.overutilized_time / self.time_range
    373 
    374             self._log.debug('Overutilized time: %.6f [s] (%.3f%% of trace time)',
    375                            self.overutilized_time, self.overutilized_prc)
    376 
    377     def _scanTgids(self, df):
    378         if not '__tgid' in df.columns:
    379             return
    380         df = df[['__pid', '__tgid']]
    381         df = df.drop_duplicates(keep='first').set_index('__pid')
    382         df.rename(columns = { '__pid': 'pid', '__tgid': 'tgid' },
    383                               inplace=True)
    384         self._pid_tgid = df
    385 
    386     def _scanTasks(self, df, name_key='comm', pid_key='pid'):
    387         """
    388         Extract tasks names and PIDs from the input data frame. The data frame
    389         should contain a task name column and PID column.
    390 
    391         :param df: data frame containing trace events from which tasks names
    392             and PIDs will be extracted
    393         :type df: :mod:`pandas.DataFrame`
    394 
    395         :param name_key: The name of the dataframe columns containing task
    396             names
    397         :type name_key: str
    398 
    399         :param pid_key: The name of the dataframe columns containing task PIDs
    400         :type pid_key: str
    401         """
    402         df = df[[name_key, pid_key]].drop_duplicates()
    403         self._tasks_by_name = df.set_index(name_key)
    404         self._tasks_by_pid = df.set_index(pid_key)
    405 
    406     def getTaskByName(self, name):
    407         """
    408         Get the PIDs of all tasks with the specified name.
    409 
    410         :param name: task name
    411         :type name: str
    412         """
    413         if name not in self._tasks_by_name.index:
    414             return []
    415         if len(self._tasks_by_name.ix[name].values) > 1:
    416             return list({task[0] for task in
    417                          self._tasks_by_name.ix[name].values})
    418         return [self._tasks_by_name.ix[name].values[0]]
    419 
    420     def getTaskByPid(self, pid):
    421         """
    422         Get the names of all tasks with the specified PID.
    423 
    424         :param name: task PID
    425         :type name: int
    426         """
    427         if pid not in self._tasks_by_pid.index:
    428             return []
    429         if len(self._tasks_by_pid.ix[pid].values) > 1:
    430             return list({task[0] for task in
    431                          self._tasks_by_pid.ix[pid].values})
    432         return [self._tasks_by_pid.ix[pid].values[0]]
    433 
    434     def getTgidFromPid(self, pid):
    435         return _pid_tgid.ix[pid].values[0]
    436 
    437     def getTasks(self, dataframe=None,
    438                  task_names=None, name_key='comm', pid_key='pid'):
    439         """
    440         Helper function to get PIDs of specified tasks.
    441 
    442         This method can take a Pandas dataset in input to be used to fiter out
    443         the PIDs of all the specified tasks. If a dataset is not provided,
    444         previously filtered PIDs are returned.
    445 
    446         If a list of task names is not provided, all tasks detected in the trace
    447         will be used. The specified dataframe must provide at least two columns
    448         reporting the task name and the task PID. The default values of this
    449         colums could be specified using the provided parameters.
    450 
    451         :param dataframe: A Pandas dataframe containing at least 'name_key' and
    452             'pid_key' columns. If None, the all PIDs are returned.
    453         :type dataframe: :mod:`pandas.DataFrame`
    454 
    455         :param task_names: The list of tasks to get the PID of (default: all
    456             tasks)
    457         :type task_names: list(str)
    458 
    459         :param name_key: The name of the dataframe columns containing task
    460             names
    461         :type name_key: str
    462 
    463         :param pid_key: The name of the dataframe columns containing task PIDs
    464         :type pid_key: str
    465         """
    466         if task_names is None:
    467             task_names = self.tasks.keys()
    468         if dataframe is None:
    469             return {k: v for k, v in  self.tasks.iteritems() if k in task_names}
    470         df = dataframe
    471         self._log.debug('Lookup dataset for tasks...')
    472         for tname in task_names:
    473             self._log.debug('Lookup for task [%s]...', tname)
    474             results = df[df[name_key] == tname][[name_key, pid_key]]
    475             if len(results) == 0:
    476                 self._log.error('  task %16s NOT found', tname)
    477                 continue
    478             (name, pid) = results.head(1).values[0]
    479             if name != tname:
    480                 self._log.error('  task %16s NOT found', tname)
    481                 continue
    482             if tname not in self.tasks:
    483                 self.tasks[tname] = {}
    484             pids = list(results[pid_key].unique())
    485             self.tasks[tname]['pid'] = pids
    486             self._log.debug('  task %16s found, pid: %s',
    487                             tname, self.tasks[tname]['pid'])
    488         return self.tasks
    489 
    490 
    491 ###############################################################################
    492 # DataFrame Getter Methods
    493 ###############################################################################
    494 
    495     def df(self, event):
    496         """
    497         Get a dataframe containing all occurrences of the specified trace event
    498         in the parsed trace.
    499 
    500         :param event: Trace event name
    501         :type event: str
    502         """
    503         warnings.simplefilter('always', DeprecationWarning) #turn off filter
    504         warnings.warn("\n\tUse of Trace::df() is deprecated and will be soon removed."
    505                       "\n\tUse Trace::data_frame.trace_event(event_name) instead.",
    506                       category=DeprecationWarning)
    507         warnings.simplefilter('default', DeprecationWarning) #reset filter
    508         return self._dfg_trace_event(event)
    509 
    510     def _dfg_trace_event(self, event):
    511         """
    512         Get a dataframe containing all occurrences of the specified trace event
    513         in the parsed trace.
    514 
    515         :param event: Trace event name
    516         :type event: str
    517         """
    518         if self.data_dir is None:
    519             raise ValueError("trace data not (yet) loaded")
    520         if self.ftrace and hasattr(self.ftrace, event):
    521             return getattr(self.ftrace, event).data_frame
    522         raise ValueError('Event [{}] not supported. '
    523                          'Supported events are: {}'
    524                          .format(event, self.available_events))
    525 
    526     def _dfg_functions_stats(self, functions=None):
    527         """
    528         Get a DataFrame of specified kernel functions profile data
    529 
    530         For each profiled function a DataFrame is returned which reports stats
    531         on kernel functions execution time. The reported stats are per-CPU and
    532         includes: number of times the function has been executed (hits),
    533         average execution time (avg), overall execution time (time) and samples
    534         variance (s_2).
    535         By default returns a DataFrame of all the functions profiled.
    536 
    537         :param functions: the name of the function or a list of function names
    538                           to report
    539         :type functions: str or list(str)
    540         """
    541         if not hasattr(self, '_functions_stats_df'):
    542             return None
    543         df = self._functions_stats_df
    544         if not functions:
    545             return df
    546         return df.loc[df.index.get_level_values(1).isin(listify(functions))]
    547 
    548     # cgroup_attach_task with just merged fake and real events
    549     def _cgroup_attach_task(self):
    550         cgroup_events = ['cgroup_attach_task', 'cgroup_attach_task_devlib']
    551         df = None
    552 
    553         if set(cgroup_events).isdisjoint(set(self.available_events)):
    554             self._log.error('atleast one of {} is needed for cgroup_attach_task event generation'.format(cgroup_events))
    555             return None
    556 
    557         for cev in cgroup_events:
    558             if not cev in self.available_events:
    559                 continue
    560             cdf = self._dfg_trace_event(cev)
    561             cdf = cdf[['__line', 'pid', 'controller', 'cgroup']]
    562             if not isinstance(df, pd.DataFrame):
    563                 df = cdf
    564             else:
    565                 df = pd.concat([cdf, df])
    566 
    567         # Always drop na since this DF is used as secondary
    568         df.dropna(inplace=True, how='any')
    569         return df
    570 
    571     @memoized
    572     def _dfg_cgroup_attach_task(self, controllers = ['schedtune', 'cpuset']):
    573         # Since fork doesn't result in attach events, generate fake attach events
    574         # The below mechanism doesn't work to propogate nested fork levels:
    575         # For ex:
    576         # cgroup_attach_task: pid=1166
    577         # fork: pid=1166 child_pid=2222  <-- fake attach generated
    578         # fork: pid=2222 child_pid=3333  <-- fake attach not generated
    579         def fork_add_cgroup(fdf, cdf, controller):
    580             cdf = cdf[cdf['controller'] == controller]
    581             ret_df = trappy.utils.merge_dfs(fdf, cdf, pivot='pid')
    582             return ret_df
    583 
    584         if not 'sched_process_fork' in self.available_events:
    585             self._log.error('sched_process_fork is mandatory to get proper cgroup_attach events')
    586             return None
    587         fdf = self._dfg_trace_event('sched_process_fork')
    588 
    589         forks_len = len(fdf)
    590         forkdf = fdf
    591         cdf = self._cgroup_attach_task()
    592         for idx, c in enumerate(controllers):
    593             fdf = fork_add_cgroup(fdf, cdf, c)
    594             if (idx != (len(controllers) - 1)):
    595                 fdf = pd.concat([fdf, forkdf]).sort_values(by='__line')
    596 
    597         fdf = fdf[['__line', 'child_pid', 'controller', 'cgroup']]
    598         fdf.rename(columns = { 'child_pid': 'pid' }, inplace=True)
    599 
    600         # Always drop na since this DF is used as secondary
    601         fdf.dropna(inplace=True, how='any')
    602 
    603         new_forks_len = len(fdf) / len(controllers)
    604 
    605         fdf = pd.concat([fdf, cdf]).sort_values(by='__line')
    606 
    607         if new_forks_len < forks_len:
    608             dropped = forks_len - new_forks_len
    609             self._log.info("Couldn't attach all forks cgroup with-attach events ({} dropped)".format(dropped))
    610         return fdf
    611 
    612     @memoized
    613     def _dfg_sched_switch_cgroup(self, controllers = ['schedtune', 'cpuset']):
    614         def sched_switch_add_cgroup(sdf, cdf, controller, direction):
    615             cdf = cdf[cdf['controller'] == controller]
    616 
    617             ret_df = sdf.rename(columns = { direction + '_pid': 'pid' })
    618             ret_df = trappy.utils.merge_dfs(ret_df, cdf, pivot='pid')
    619             ret_df.rename(columns = { 'pid': direction + '_pid' }, inplace=True)
    620 
    621             ret_df.drop('controller', axis=1, inplace=True)
    622             ret_df.rename(columns = { 'cgroup': direction + '_' + controller }, inplace=True)
    623             return ret_df
    624 
    625         if not 'sched_switch' in self.available_events:
    626             self._log.error('sched_switch is mandatory to generate sched_switch_cgroup event')
    627             return None
    628         sdf = self._dfg_trace_event('sched_switch')
    629         cdf = self._dfg_cgroup_attach_task()
    630 
    631         for c in controllers:
    632             sdf = sched_switch_add_cgroup(sdf, cdf, c, 'next')
    633             sdf = sched_switch_add_cgroup(sdf, cdf, c, 'prev')
    634 
    635         # Augment with TGID information
    636         sdf = sdf.join(self._pid_tgid, on='next_pid').rename(columns = {'tgid': 'next_tgid'})
    637         sdf = sdf.join(self._pid_tgid, on='prev_pid').rename(columns = {'tgid': 'prev_tgid'})
    638 
    639         df = self._tasks_by_pid.rename(columns = { 'next_comm': 'comm' })
    640         sdf = sdf.join(df, on='next_tgid').rename(columns = {'comm': 'next_tgid_comm'})
    641         sdf = sdf.join(df, on='prev_tgid').rename(columns = {'comm': 'prev_tgid_comm'})
    642         return sdf
    643 
    644 ###############################################################################
    645 # Trace Events Sanitize Methods
    646 ###############################################################################
    647 
    648     def _sanitize_SchedCpuCapacity(self):
    649         """
    650         Add more columns to cpu_capacity data frame if the energy model is
    651         available.
    652         """
    653         if not self.hasEvents('cpu_capacity') \
    654            or 'nrg_model' not in self.platform:
    655             return
    656 
    657         df = self._dfg_trace_event('cpu_capacity')
    658 
    659         # Add column with LITTLE and big CPUs max capacities
    660         nrg_model = self.platform['nrg_model']
    661         max_lcap = nrg_model['little']['cpu']['cap_max']
    662         max_bcap = nrg_model['big']['cpu']['cap_max']
    663         df['max_capacity'] = np.select(
    664                 [df.cpu.isin(self.platform['clusters']['little'])],
    665                 [max_lcap], max_bcap)
    666         # Add LITTLE and big CPUs "tipping point" threshold
    667         tip_lcap = 0.8 * max_lcap
    668         tip_bcap = 0.8 * max_bcap
    669         df['tip_capacity'] = np.select(
    670                 [df.cpu.isin(self.platform['clusters']['little'])],
    671                 [tip_lcap], tip_bcap)
    672 
    673     def _sanitize_SchedLoadAvgCpu(self):
    674         """
    675         If necessary, rename certain signal names from v5.0 to v5.1 format.
    676         """
    677         if not self.hasEvents('sched_load_avg_cpu'):
    678             return
    679         df = self._dfg_trace_event('sched_load_avg_cpu')
    680         if 'utilization' in df:
    681             df.rename(columns={'utilization': 'util_avg'}, inplace=True)
    682             df.rename(columns={'load': 'load_avg'}, inplace=True)
    683 
    684     def _sanitize_SchedLoadAvgTask(self):
    685         """
    686         If necessary, rename certain signal names from v5.0 to v5.1 format.
    687         """
    688         if not self.hasEvents('sched_load_avg_task'):
    689             return
    690         df = self._dfg_trace_event('sched_load_avg_task')
    691         if 'utilization' in df:
    692             df.rename(columns={'utilization': 'util_avg'}, inplace=True)
    693             df.rename(columns={'load': 'load_avg'}, inplace=True)
    694             df.rename(columns={'avg_period': 'period_contrib'}, inplace=True)
    695             df.rename(columns={'runnable_avg_sum': 'load_sum'}, inplace=True)
    696             df.rename(columns={'running_avg_sum': 'util_sum'}, inplace=True)
    697         df['cluster'] = np.select(
    698                 [df.cpu.isin(self.platform['clusters']['little'])],
    699                 ['LITTLE'], 'big')
    700         # Add a column which represents the max capacity of the smallest
    701         # clustre which can accomodate the task utilization
    702         little_cap = self.platform['nrg_model']['little']['cpu']['cap_max']
    703         big_cap = self.platform['nrg_model']['big']['cpu']['cap_max']
    704         df['min_cluster_cap'] = df.util_avg.map(
    705             lambda util_avg: big_cap if util_avg > little_cap else little_cap
    706         )
    707 
    708     def _sanitize_SchedBoostCpu(self):
    709         """
    710         Add a boosted utilization signal as the sum of utilization and margin.
    711 
    712         Also, if necessary, rename certain signal names from v5.0 to v5.1
    713         format.
    714         """
    715         if not self.hasEvents('sched_boost_cpu'):
    716             return
    717         df = self._dfg_trace_event('sched_boost_cpu')
    718         if 'usage' in df:
    719             df.rename(columns={'usage': 'util'}, inplace=True)
    720         df['boosted_util'] = df['util'] + df['margin']
    721 
    722     def _sanitize_SchedBoostTask(self):
    723         """
    724         Add a boosted utilization signal as the sum of utilization and margin.
    725 
    726         Also, if necessary, rename certain signal names from v5.0 to v5.1
    727         format.
    728         """
    729         if not self.hasEvents('sched_boost_task'):
    730             return
    731         df = self._dfg_trace_event('sched_boost_task')
    732         if 'utilization' in df:
    733             # Convert signals name from to v5.1 format
    734             df.rename(columns={'utilization': 'util'}, inplace=True)
    735         df['boosted_util'] = df['util'] + df['margin']
    736 
    737     def _sanitize_SchedEnergyDiff(self):
    738         """
    739         If a energy model is provided, some signals are added to the
    740         sched_energy_diff trace event data frame.
    741 
    742         Also convert between existing field name formats for sched_energy_diff
    743         """
    744         if not self.hasEvents('sched_energy_diff') \
    745            or 'nrg_model' not in self.platform:
    746             return
    747         nrg_model = self.platform['nrg_model']
    748         em_lcluster = nrg_model['little']['cluster']
    749         em_bcluster = nrg_model['big']['cluster']
    750         em_lcpu = nrg_model['little']['cpu']
    751         em_bcpu = nrg_model['big']['cpu']
    752         lcpus = len(self.platform['clusters']['little'])
    753         bcpus = len(self.platform['clusters']['big'])
    754         SCHED_LOAD_SCALE = 1024
    755 
    756         power_max = em_lcpu['nrg_max'] * lcpus + em_bcpu['nrg_max'] * bcpus + \
    757             em_lcluster['nrg_max'] + em_bcluster['nrg_max']
    758         self._log.debug(
    759             "Maximum estimated system energy: {0:d}".format(power_max))
    760 
    761         df = self._dfg_trace_event('sched_energy_diff')
    762 
    763         translations = {'nrg_d' : 'nrg_diff',
    764                         'utl_d' : 'usage_delta',
    765                         'payoff' : 'nrg_payoff'
    766         }
    767         df.rename(columns=translations, inplace=True)
    768 
    769         df['nrg_diff_pct'] = SCHED_LOAD_SCALE * df.nrg_diff / power_max
    770 
    771         # Tag columns by usage_delta
    772         ccol = df.usage_delta
    773         df['usage_delta_group'] = np.select(
    774             [ccol < 150, ccol < 400, ccol < 600],
    775             ['< 150', '< 400', '< 600'], '>= 600')
    776 
    777         # Tag columns by nrg_payoff
    778         ccol = df.nrg_payoff
    779         df['nrg_payoff_group'] = np.select(
    780             [ccol > 2e9, ccol > 0, ccol > -2e9],
    781             ['Optimal Accept', 'SchedTune Accept', 'SchedTune Reject'],
    782             'Suboptimal Reject')
    783 
    784     def _sanitize_SchedOverutilized(self):
    785         """ Add a column with overutilized status duration. """
    786         if not self.hasEvents('sched_overutilized'):
    787             return
    788         df = self._dfg_trace_event('sched_overutilized')
    789         df['start'] = df.index
    790         df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1)
    791         df.drop('start', axis=1, inplace=True)
    792 
    793     # Sanitize cgroup information helper
    794     def _helper_sanitize_CgroupAttachTask(self, df, allowed_cgroups, controller_id_name):
    795         # Drop rows that aren't in the root-id -> name map
    796         df = df[df['dst_root'].isin(controller_id_name.keys())]
    797 
    798         def get_cgroup_name(path, valid_names):
    799             name = os.path.basename(path)
    800             name = 'root' if not name in valid_names else name
    801             return name
    802 
    803         def get_cgroup_names(rows):
    804             ret = []
    805             for r in rows.iterrows():
    806                  ret.append(get_cgroup_name(r[1]['dst_path'], allowed_cgroups))
    807             return ret
    808 
    809         def get_controller_names(rows):
    810             ret = []
    811             for r in rows.iterrows():
    812                  ret.append(controller_id_name[r[1]['dst_root']])
    813             return ret
    814 
    815         # Sanitize cgroup names
    816         # cgroup column isn't in mainline, add it in
    817         # its already added for some out of tree kernels so check first
    818         if not 'cgroup' in df.columns:
    819             if not 'dst_path' in df.columns:
    820                 raise RuntimeError('Cant santize cgroup DF, need dst_path')
    821             df = df.assign(cgroup = get_cgroup_names)
    822 
    823         # Sanitize controller names
    824         if not 'controller' in df.columns:
    825             if not 'dst_root' in df.columns:
    826                 raise RuntimeError('Cant santize cgroup DF, need dst_path')
    827             df = df.assign(controller = get_controller_names)
    828 
    829         return df
    830 
    831     def _sanitize_CgroupAttachTask(self):
    832         def sanitize_cgroup_event(name):
    833             if not name in self.available_events:
    834                 return
    835 
    836             df = self._dfg_trace_event(name)
    837 
    838             if len(df.groupby(level=0).filter(lambda x: len(x) > 1)) > 0:
    839                 self._log.warning('Timstamp Collisions seen in {} event!'.format(name))
    840 
    841             df = self._helper_sanitize_CgroupAttachTask(df, self.cgroup_info['cgroups'],
    842                                               self.cgroup_info['controller_ids'])
    843             getattr(self.ftrace, name).data_frame = df
    844         sanitize_cgroup_event('cgroup_attach_task')
    845         sanitize_cgroup_event('cgroup_attach_task_devlib')
    846 
    847     def _chunker(self, seq, size):
    848         """
    849         Given a data frame or a series, generate a sequence of chunks of the
    850         given size.
    851 
    852         :param seq: data to be split into chunks
    853         :type seq: :mod:`pandas.Series` or :mod:`pandas.DataFrame`
    854 
    855         :param size: size of each chunk
    856         :type size: int
    857         """
    858         return (seq.iloc[pos:pos + size] for pos in range(0, len(seq), size))
    859 
    860     def _sanitize_CpuFrequency(self):
    861         """
    862         Verify that all platform reported clusters are frequency coherent (i.e.
    863         frequency scaling is performed at a cluster level).
    864         """
    865         if not self.hasEvents('cpu_frequency_devlib'):
    866             return
    867 
    868         devlib_freq = self._dfg_trace_event('cpu_frequency_devlib')
    869         devlib_freq.rename(columns={'cpu_id':'cpu'}, inplace=True)
    870         devlib_freq.rename(columns={'state':'frequency'}, inplace=True)
    871 
    872         df = self._dfg_trace_event('cpu_frequency')
    873         clusters = self.platform['clusters']
    874 
    875         # devlib always introduces fake cpu_frequency events, in case the
    876         # OS has not generated cpu_frequency envets there are the only
    877         # frequency events to report
    878         if len(df) == 0:
    879             # Register devlib injected events as 'cpu_frequency' events
    880             setattr(self.ftrace.cpu_frequency, 'data_frame', devlib_freq)
    881             df = devlib_freq
    882             self.available_events.append('cpu_frequency')
    883 
    884         # make sure fake cpu_frequency events are never interleaved with
    885         # OS generated events
    886         else:
    887             if len(devlib_freq) > 0:
    888 
    889                 # Frequencies injection is done in a per-cluster based.
    890                 # This is based on the assumption that clusters are
    891                 # frequency choerent.
    892                 # For each cluster we inject devlib events only if
    893                 # these events does not overlaps with os-generated ones.
    894 
    895                 # Inject "initial" devlib frequencies
    896                 os_df = df
    897                 dl_df = devlib_freq.iloc[:self.platform['cpus_count']]
    898                 for _,c in self.platform['clusters'].iteritems():
    899                     dl_freqs = dl_df[dl_df.cpu.isin(c)]
    900                     os_freqs = os_df[os_df.cpu.isin(c)]
    901                     self._log.debug("First freqs for %s:\n%s", c, dl_freqs)
    902                     # All devlib events "before" os-generated events
    903                     self._log.debug("Min os freq @: %s", os_freqs.index.min())
    904                     if os_freqs.empty or \
    905                        os_freqs.index.min() > dl_freqs.index.max():
    906                         self._log.debug("Insert devlib freqs for %s", c)
    907                         df = pd.concat([dl_freqs, df])
    908 
    909                 # Inject "final" devlib frequencies
    910                 os_df = df
    911                 dl_df = devlib_freq.iloc[self.platform['cpus_count']:]
    912                 for _,c in self.platform['clusters'].iteritems():
    913                     dl_freqs = dl_df[dl_df.cpu.isin(c)]
    914                     os_freqs = os_df[os_df.cpu.isin(c)]
    915                     self._log.debug("Last freqs for %s:\n%s", c, dl_freqs)
    916                     # All devlib events "after" os-generated events
    917                     self._log.debug("Max os freq @: %s", os_freqs.index.max())
    918                     if os_freqs.empty or \
    919                        os_freqs.index.max() < dl_freqs.index.min():
    920                         self._log.debug("Append devlib freqs for %s", c)
    921                         df = pd.concat([df, dl_freqs])
    922 
    923                 df.sort_index(inplace=True)
    924 
    925             setattr(self.ftrace.cpu_frequency, 'data_frame', df)
    926 
    927         # Frequency Coherency Check
    928         for _, cpus in clusters.iteritems():
    929             cluster_df = df[df.cpu.isin(cpus)]
    930             for chunk in self._chunker(cluster_df, len(cpus)):
    931                 f = chunk.iloc[0].frequency
    932                 if any(chunk.frequency != f):
    933                     self._log.warning('Cluster Frequency is not coherent! '
    934                                       'Failure in [cpu_frequency] events at:')
    935                     self._log.warning(chunk)
    936                     self.freq_coherency = False
    937                     return
    938         self._log.info('Platform clusters verified to be Frequency coherent')
    939 
    940 ###############################################################################
    941 # Utility Methods
    942 ###############################################################################
    943 
    944     def integrate_square_wave(self, sq_wave):
    945         """
    946         Compute the integral of a square wave time series.
    947 
    948         :param sq_wave: square wave assuming only 1.0 and 0.0 values
    949         :type sq_wave: :mod:`pandas.Series`
    950         """
    951         sq_wave.iloc[-1] = 0.0
    952         # Compact signal to obtain only 1-0-1-0 sequences
    953         comp_sig = sq_wave.loc[sq_wave.shift() != sq_wave]
    954         # First value for computing the difference must be a 1
    955         if comp_sig.iloc[0] == 0.0:
    956             return sum(comp_sig.iloc[2::2].index - comp_sig.iloc[1:-1:2].index)
    957         else:
    958             return sum(comp_sig.iloc[1::2].index - comp_sig.iloc[:-1:2].index)
    959 
    960     def _loadFunctionsStats(self, path='trace.stats'):
    961         """
    962         Read functions profiling file and build a data frame containing all
    963         relevant data.
    964 
    965         :param path: path to the functions profiling trace file
    966         :type path: str
    967         """
    968         if os.path.isdir(path):
    969             path = os.path.join(path, 'trace.stats')
    970         if path.endswith('dat') or path.endswith('html'):
    971             pre, ext = os.path.splitext(path)
    972             path = pre + '.stats'
    973         if not os.path.isfile(path):
    974             return False
    975 
    976         # Opening functions profiling JSON data file
    977         self._log.debug('Loading functions profiling data from [%s]...', path)
    978         with open(os.path.join(path), 'r') as fh:
    979             trace_stats = json.load(fh)
    980 
    981         # Build DataFrame of function stats
    982         frames = {}
    983         for cpu, data in trace_stats.iteritems():
    984             frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index')
    985 
    986         # Build and keep track of the DataFrame
    987         self._functions_stats_df = pd.concat(frames.values(),
    988                                              keys=frames.keys())
    989 
    990         return len(self._functions_stats_df) > 0
    991 
    992     @memoized
    993     def getCPUActiveSignal(self, cpu):
    994         """
    995         Build a square wave representing the active (i.e. non-idle) CPU time,
    996         i.e.:
    997 
    998           cpu_active[t] == 1 if the CPU is reported to be non-idle by cpuidle at
    999           time t
   1000           cpu_active[t] == 0 otherwise
   1001 
   1002         :param cpu: CPU ID
   1003         :type cpu: int
   1004 
   1005         :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
   1006                   "cpu_idle" events
   1007         """
   1008         if not self.hasEvents('cpu_idle'):
   1009             self._log.warning('Events [cpu_idle] not found, '
   1010                               'cannot compute CPU active signal!')
   1011             return None
   1012 
   1013         idle_df = self._dfg_trace_event('cpu_idle')
   1014         cpu_df = idle_df[idle_df.cpu_id == cpu]
   1015 
   1016         cpu_active = cpu_df.state.apply(
   1017             lambda s: 1 if s == NON_IDLE_STATE else 0
   1018         )
   1019 
   1020         start_time = 0.0
   1021         if not self.ftrace.normalized_time:
   1022             start_time = self.ftrace.basetime
   1023 
   1024         if cpu_active.empty:
   1025             cpu_active = pd.Series([0], index=[start_time])
   1026         elif cpu_active.index[0] != start_time:
   1027             entry_0 = pd.Series(cpu_active.iloc[0] ^ 1, index=[start_time])
   1028             cpu_active = pd.concat([entry_0, cpu_active])
   1029 
   1030         # Fix sequences of wakeup/sleep events reported with the same index
   1031         return handle_duplicate_index(cpu_active)
   1032 
   1033 
   1034     @memoized
   1035     def getClusterActiveSignal(self, cluster):
   1036         """
   1037         Build a square wave representing the active (i.e. non-idle) cluster
   1038         time, i.e.:
   1039 
   1040           cluster_active[t] == 1 if at least one CPU is reported to be non-idle
   1041           by CPUFreq at time t
   1042           cluster_active[t] == 0 otherwise
   1043 
   1044         :param cluster: list of CPU IDs belonging to a cluster
   1045         :type cluster: list(int)
   1046 
   1047         :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no
   1048                   "cpu_idle" events
   1049         """
   1050         if not self.hasEvents('cpu_idle'):
   1051             self._log.warning('Events [cpu_idle] not found, '
   1052                               'cannot compute cluster active signal!')
   1053             return None
   1054 
   1055         active = self.getCPUActiveSignal(cluster[0]).to_frame(name=cluster[0])
   1056         for cpu in cluster[1:]:
   1057             active = active.join(
   1058                 self.getCPUActiveSignal(cpu).to_frame(name=cpu),
   1059                 how='outer'
   1060             )
   1061 
   1062         active.fillna(method='ffill', inplace=True)
   1063 
   1064         # Cluster active is the OR between the actives on each CPU
   1065         # belonging to that specific cluster
   1066         cluster_active = reduce(
   1067             operator.or_,
   1068             [cpu_active.astype(int) for _, cpu_active in
   1069              active.iteritems()]
   1070         )
   1071 
   1072         return cluster_active
   1073 
   1074 
   1075 class TraceData:
   1076     """ A DataFrame collector exposed to Trace's clients """
   1077     pass
   1078 
   1079 # vim :set tabstop=4 shiftwidth=4 expandtab
   1080