1 # SPDX-License-Identifier: Apache-2.0 2 # 3 # Copyright (C) 2015, ARM Limited and contributors. 4 # 5 # Licensed under the Apache License, Version 2.0 (the "License"); you may 6 # not use this file except in compliance with the License. 7 # You may obtain a copy of the License at 8 # 9 # http://www.apache.org/licenses/LICENSE-2.0 10 # 11 # Unless required by applicable law or agreed to in writing, software 12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 # See the License for the specific language governing permissions and 15 # limitations under the License. 16 # 17 18 """ Trace Parser Module """ 19 20 import numpy as np 21 import os 22 import pandas as pd 23 import sys 24 import trappy 25 import json 26 import warnings 27 import operator 28 import logging 29 30 from analysis_register import AnalysisRegister 31 from collections import namedtuple 32 from devlib.utils.misc import memoized 33 from trappy.utils import listify, handle_duplicate_index 34 35 36 NON_IDLE_STATE = -1 37 ResidencyTime = namedtuple('ResidencyTime', ['total', 'active']) 38 ResidencyData = namedtuple('ResidencyData', ['label', 'residency']) 39 40 class Trace(object): 41 """ 42 The Trace object is the LISA trace events parser. 43 44 :param platform: a dictionary containing information about the target 45 platform 46 :type platform: dict 47 48 :param data_dir: folder containing all trace data 49 :type data_dir: str 50 51 :param events: events to be parsed (everything in the trace by default) 52 :type events: list(str) 53 54 :param tasks: filter data for the specified tasks only. If None (default), 55 use data for all tasks found in the trace. 56 :type tasks: list(str) or NoneType 57 58 :param window: time window to consider when parsing the trace 59 :type window: tuple(int, int) 60 61 :param normalize_time: normalize trace time stamps 62 :type normalize_time: bool 63 64 :param trace_format: format of the trace. Possible values are: 65 - FTrace 66 - SysTrace 67 :type trace_format: str 68 69 :param plots_dir: directory where to save plots 70 :type plots_dir: str 71 72 :param plots_prefix: prefix for plots file names 73 :type plots_prefix: str 74 75 :param cgroup_info: add cgroup information for sanitization 76 example: 77 { 78 'controller_ids': { 2: 'schedtune', 4: 'cpuset' }, 79 'cgroups': [ 'root', 'background', 'foreground' ], # list of allowed cgroup names 80 } 81 :type cgroup_info: dict 82 """ 83 84 def __init__(self, platform, data_dir, events=None, 85 tasks=None, window=(0, None), 86 normalize_time=True, 87 trace_format='FTrace', 88 plots_dir=None, 89 plots_prefix='', 90 cgroup_info={}): 91 92 # The platform used to run the experiments 93 self.platform = platform 94 95 # TRAPpy Trace object 96 self.ftrace = None 97 98 # Trace format 99 self.trace_format = trace_format 100 101 # The time window used to limit trace parsing to 102 self.window = window 103 104 # Dynamically registered TRAPpy events 105 self.trappy_cls = {} 106 107 # Maximum timespan for all collected events 108 self.time_range = 0 109 110 # Time the system was overutilzied 111 self.overutilized_time = 0 112 self.overutilized_prc = 0 113 114 # The dictionary of tasks descriptors available in the dataset 115 self.tasks = {} 116 117 # List of events required by user 118 self.events = [] 119 120 # List of events available in the parsed trace 121 self.available_events = [] 122 123 # Cluster frequency coherency flag 124 self.freq_coherency = True 125 126 # Folder containing all trace data 127 self.data_dir = None 128 129 # Setup logging 130 self._log = logging.getLogger('Trace') 131 132 # Folder containing trace 133 if not os.path.isdir(data_dir): 134 self.data_dir = os.path.dirname(data_dir) 135 else: 136 self.data_dir = data_dir 137 138 # By deafult, use the trace dir to save plots 139 self.plots_dir = plots_dir 140 if self.plots_dir is None: 141 self.plots_dir = self.data_dir 142 self.plots_prefix = plots_prefix 143 144 # Cgroup info for sanitization 145 self.cgroup_info = cgroup_info 146 147 self.__registerTraceEvents(events) if events else None 148 self.__parseTrace(data_dir, tasks, window, normalize_time, 149 trace_format) 150 self.__computeTimeSpan() 151 152 # Minimum and Maximum x_time to use for all plots 153 self.x_min = 0 154 self.x_max = self.time_range 155 156 # Reset x axis time range to full scale 157 t_min = self.window[0] 158 t_max = self.window[1] 159 self.setXTimeRange(t_min, t_max) 160 161 self.data_frame = TraceData() 162 self._registerDataFrameGetters(self) 163 164 self.analysis = AnalysisRegister(self) 165 166 def _registerDataFrameGetters(self, module): 167 """ 168 Internal utility function that looks up getter functions with a "_dfg_" 169 prefix in their name and bounds them to the specified module. 170 171 :param module: module to which the function is added 172 :type module: class 173 """ 174 self._log.debug('Registering [%s] local data frames', module) 175 for func in dir(module): 176 if not func.startswith('_dfg_'): 177 continue 178 dfg_name = func.replace('_dfg_', '') 179 dfg_func = getattr(module, func) 180 self._log.debug(' %s', dfg_name) 181 setattr(self.data_frame, dfg_name, dfg_func) 182 183 def setXTimeRange(self, t_min=None, t_max=None): 184 """ 185 Set x axis time range to the specified values. 186 187 :param t_min: lower bound 188 :type t_min: int or float 189 190 :param t_max: upper bound 191 :type t_max: int or float 192 """ 193 if t_min is None: 194 self.x_min = 0 195 else: 196 self.x_min = t_min 197 if t_max is None: 198 self.x_max = self.time_range 199 else: 200 self.x_max = t_max 201 self._log.debug('Set plots time range to (%.6f, %.6f)[s]', 202 self.x_min, self.x_max) 203 204 def __registerTraceEvents(self, events): 205 """ 206 Save a copy of the parsed events. 207 208 :param events: single event name or list of events names 209 :type events: str or list(str) 210 """ 211 if isinstance(events, basestring): 212 self.events = events.split(' ') 213 elif isinstance(events, list): 214 self.events = events 215 else: 216 raise ValueError('Events must be a string or a list of strings') 217 # Register devlib fake cpu_frequency events 218 if 'cpu_frequency' in events: 219 self.events.append('cpu_frequency_devlib') 220 221 def __parseTrace(self, path, tasks, window, normalize_time, trace_format): 222 """ 223 Internal method in charge of performing the actual parsing of the 224 trace. 225 226 :param path: path to the trace folder (or trace file) 227 :type path: str 228 229 :param tasks: filter data for the specified tasks only 230 :type tasks: list(str) 231 232 :param window: time window to consider when parsing the trace 233 :type window: tuple(int, int) 234 235 :param normalize_time: normalize trace time stamps 236 :type normalize_time: bool 237 238 :param trace_format: format of the trace. Possible values are: 239 - FTrace 240 - SysTrace 241 :type trace_format: str 242 """ 243 self._log.debug('Loading [sched] events from trace in [%s]...', path) 244 self._log.debug('Parsing events: %s', self.events if self.events else 'ALL') 245 if trace_format.upper() == 'SYSTRACE' or path.endswith('html'): 246 self._log.debug('Parsing SysTrace format...') 247 trace_class = trappy.SysTrace 248 self.trace_format = 'SysTrace' 249 elif trace_format.upper() == 'FTRACE': 250 self._log.debug('Parsing FTrace format...') 251 trace_class = trappy.FTrace 252 self.trace_format = 'FTrace' 253 else: 254 raise ValueError("Unknown trace format {}".format(trace_format)) 255 256 scope = 'custom' if self.events else 'all' 257 self.ftrace = trace_class(path, scope=scope, events=self.events, 258 window=window, normalize_time=normalize_time) 259 260 # Load Functions profiling data 261 has_function_stats = self._loadFunctionsStats(path) 262 263 # Check for events available on the parsed trace 264 self.__checkAvailableEvents() 265 if len(self.available_events) == 0: 266 if has_function_stats: 267 self._log.info('Trace contains only functions stats') 268 return 269 raise ValueError('The trace does not contain useful events ' 270 'nor function stats') 271 272 # Sanitize cgroup info if any 273 self._sanitize_CgroupAttachTask() 274 275 # Santization not possible if platform missing 276 if not self.platform: 277 # Setup internal data reference to interesting events/dataframes 278 self._sanitize_SchedLoadAvgCpu() 279 self._sanitize_SchedLoadAvgTask() 280 self._sanitize_SchedCpuCapacity() 281 self._sanitize_SchedBoostCpu() 282 self._sanitize_SchedBoostTask() 283 self._sanitize_SchedEnergyDiff() 284 self._sanitize_SchedOverutilized() 285 self._sanitize_CpuFrequency() 286 287 self.__loadTasksNames(tasks) 288 289 # Compute plot window 290 if not normalize_time: 291 start = self.window[0] 292 if self.window[1]: 293 duration = min(self.ftrace.get_duration(), self.window[1]) 294 else: 295 duration = self.ftrace.get_duration() 296 self.window = (self.ftrace.basetime + start, 297 self.ftrace.basetime + duration) 298 299 def __checkAvailableEvents(self, key=""): 300 """ 301 Internal method used to build a list of available events. 302 303 :param key: key to be used for TRAPpy filtering 304 :type key: str 305 """ 306 for val in self.ftrace.get_filters(key): 307 obj = getattr(self.ftrace, val) 308 if len(obj.data_frame): 309 self.available_events.append(val) 310 self._log.debug('Events found on trace:') 311 for evt in self.available_events: 312 self._log.debug(' - %s', evt) 313 314 def __loadTasksNames(self, tasks): 315 """ 316 Try to load tasks names using one of the supported events. 317 318 :param tasks: list of task names. If None, load all tasks found. 319 :type tasks: list(str) or NoneType 320 """ 321 def load(tasks, event, name_key, pid_key): 322 df = self._dfg_trace_event(event) 323 if tasks is None: 324 tasks = df[name_key].unique() 325 self.getTasks(df, tasks, name_key=name_key, pid_key=pid_key) 326 self._scanTasks(df, name_key=name_key, pid_key=pid_key) 327 self._scanTgids(df) 328 329 if 'sched_switch' in self.available_events: 330 load(tasks, 'sched_switch', 'next_comm', 'next_pid') 331 elif 'sched_load_avg_task' in self.available_events: 332 load(tasks, 'sched_load_avg_task', 'comm', 'pid') 333 else: 334 self._log.warning('Failed to load tasks names from trace events') 335 336 def hasEvents(self, dataset): 337 """ 338 Returns True if the specified event is present in the parsed trace, 339 False otherwise. 340 341 :param dataset: trace event name or list of trace events 342 :type dataset: str or list(str) 343 """ 344 if dataset in self.available_events: 345 return True 346 return False 347 348 def __computeTimeSpan(self): 349 """ 350 Compute time axis range, considering all the parsed events. 351 """ 352 ts = sys.maxint 353 te = 0 354 355 for events in self.available_events: 356 df = self._dfg_trace_event(events) 357 if len(df) == 0: 358 continue 359 if (df.index[0]) < ts: 360 ts = df.index[0] 361 if (df.index[-1]) > te: 362 te = df.index[-1] 363 self.time_range = te - ts 364 365 self._log.debug('Collected events spans a %.3f [s] time interval', 366 self.time_range) 367 368 # Build a stat on trace overutilization 369 if self.hasEvents('sched_overutilized'): 370 df = self._dfg_trace_event('sched_overutilized') 371 self.overutilized_time = df[df.overutilized == 1].len.sum() 372 self.overutilized_prc = 100. * self.overutilized_time / self.time_range 373 374 self._log.debug('Overutilized time: %.6f [s] (%.3f%% of trace time)', 375 self.overutilized_time, self.overutilized_prc) 376 377 def _scanTgids(self, df): 378 if not '__tgid' in df.columns: 379 return 380 df = df[['__pid', '__tgid']] 381 df = df.drop_duplicates(keep='first').set_index('__pid') 382 df.rename(columns = { '__pid': 'pid', '__tgid': 'tgid' }, 383 inplace=True) 384 self._pid_tgid = df 385 386 def _scanTasks(self, df, name_key='comm', pid_key='pid'): 387 """ 388 Extract tasks names and PIDs from the input data frame. The data frame 389 should contain a task name column and PID column. 390 391 :param df: data frame containing trace events from which tasks names 392 and PIDs will be extracted 393 :type df: :mod:`pandas.DataFrame` 394 395 :param name_key: The name of the dataframe columns containing task 396 names 397 :type name_key: str 398 399 :param pid_key: The name of the dataframe columns containing task PIDs 400 :type pid_key: str 401 """ 402 df = df[[name_key, pid_key]].drop_duplicates() 403 self._tasks_by_name = df.set_index(name_key) 404 self._tasks_by_pid = df.set_index(pid_key) 405 406 def getTaskByName(self, name): 407 """ 408 Get the PIDs of all tasks with the specified name. 409 410 :param name: task name 411 :type name: str 412 """ 413 if name not in self._tasks_by_name.index: 414 return [] 415 if len(self._tasks_by_name.ix[name].values) > 1: 416 return list({task[0] for task in 417 self._tasks_by_name.ix[name].values}) 418 return [self._tasks_by_name.ix[name].values[0]] 419 420 def getTaskByPid(self, pid): 421 """ 422 Get the names of all tasks with the specified PID. 423 424 :param name: task PID 425 :type name: int 426 """ 427 if pid not in self._tasks_by_pid.index: 428 return [] 429 if len(self._tasks_by_pid.ix[pid].values) > 1: 430 return list({task[0] for task in 431 self._tasks_by_pid.ix[pid].values}) 432 return [self._tasks_by_pid.ix[pid].values[0]] 433 434 def getTgidFromPid(self, pid): 435 return _pid_tgid.ix[pid].values[0] 436 437 def getTasks(self, dataframe=None, 438 task_names=None, name_key='comm', pid_key='pid'): 439 """ 440 Helper function to get PIDs of specified tasks. 441 442 This method can take a Pandas dataset in input to be used to fiter out 443 the PIDs of all the specified tasks. If a dataset is not provided, 444 previously filtered PIDs are returned. 445 446 If a list of task names is not provided, all tasks detected in the trace 447 will be used. The specified dataframe must provide at least two columns 448 reporting the task name and the task PID. The default values of this 449 colums could be specified using the provided parameters. 450 451 :param dataframe: A Pandas dataframe containing at least 'name_key' and 452 'pid_key' columns. If None, the all PIDs are returned. 453 :type dataframe: :mod:`pandas.DataFrame` 454 455 :param task_names: The list of tasks to get the PID of (default: all 456 tasks) 457 :type task_names: list(str) 458 459 :param name_key: The name of the dataframe columns containing task 460 names 461 :type name_key: str 462 463 :param pid_key: The name of the dataframe columns containing task PIDs 464 :type pid_key: str 465 """ 466 if task_names is None: 467 task_names = self.tasks.keys() 468 if dataframe is None: 469 return {k: v for k, v in self.tasks.iteritems() if k in task_names} 470 df = dataframe 471 self._log.debug('Lookup dataset for tasks...') 472 for tname in task_names: 473 self._log.debug('Lookup for task [%s]...', tname) 474 results = df[df[name_key] == tname][[name_key, pid_key]] 475 if len(results) == 0: 476 self._log.error(' task %16s NOT found', tname) 477 continue 478 (name, pid) = results.head(1).values[0] 479 if name != tname: 480 self._log.error(' task %16s NOT found', tname) 481 continue 482 if tname not in self.tasks: 483 self.tasks[tname] = {} 484 pids = list(results[pid_key].unique()) 485 self.tasks[tname]['pid'] = pids 486 self._log.debug(' task %16s found, pid: %s', 487 tname, self.tasks[tname]['pid']) 488 return self.tasks 489 490 491 ############################################################################### 492 # DataFrame Getter Methods 493 ############################################################################### 494 495 def df(self, event): 496 """ 497 Get a dataframe containing all occurrences of the specified trace event 498 in the parsed trace. 499 500 :param event: Trace event name 501 :type event: str 502 """ 503 warnings.simplefilter('always', DeprecationWarning) #turn off filter 504 warnings.warn("\n\tUse of Trace::df() is deprecated and will be soon removed." 505 "\n\tUse Trace::data_frame.trace_event(event_name) instead.", 506 category=DeprecationWarning) 507 warnings.simplefilter('default', DeprecationWarning) #reset filter 508 return self._dfg_trace_event(event) 509 510 def _dfg_trace_event(self, event): 511 """ 512 Get a dataframe containing all occurrences of the specified trace event 513 in the parsed trace. 514 515 :param event: Trace event name 516 :type event: str 517 """ 518 if self.data_dir is None: 519 raise ValueError("trace data not (yet) loaded") 520 if self.ftrace and hasattr(self.ftrace, event): 521 return getattr(self.ftrace, event).data_frame 522 raise ValueError('Event [{}] not supported. ' 523 'Supported events are: {}' 524 .format(event, self.available_events)) 525 526 def _dfg_functions_stats(self, functions=None): 527 """ 528 Get a DataFrame of specified kernel functions profile data 529 530 For each profiled function a DataFrame is returned which reports stats 531 on kernel functions execution time. The reported stats are per-CPU and 532 includes: number of times the function has been executed (hits), 533 average execution time (avg), overall execution time (time) and samples 534 variance (s_2). 535 By default returns a DataFrame of all the functions profiled. 536 537 :param functions: the name of the function or a list of function names 538 to report 539 :type functions: str or list(str) 540 """ 541 if not hasattr(self, '_functions_stats_df'): 542 return None 543 df = self._functions_stats_df 544 if not functions: 545 return df 546 return df.loc[df.index.get_level_values(1).isin(listify(functions))] 547 548 # cgroup_attach_task with just merged fake and real events 549 def _cgroup_attach_task(self): 550 cgroup_events = ['cgroup_attach_task', 'cgroup_attach_task_devlib'] 551 df = None 552 553 if set(cgroup_events).isdisjoint(set(self.available_events)): 554 self._log.error('atleast one of {} is needed for cgroup_attach_task event generation'.format(cgroup_events)) 555 return None 556 557 for cev in cgroup_events: 558 if not cev in self.available_events: 559 continue 560 cdf = self._dfg_trace_event(cev) 561 cdf = cdf[['__line', 'pid', 'controller', 'cgroup']] 562 if not isinstance(df, pd.DataFrame): 563 df = cdf 564 else: 565 df = pd.concat([cdf, df]) 566 567 # Always drop na since this DF is used as secondary 568 df.dropna(inplace=True, how='any') 569 return df 570 571 @memoized 572 def _dfg_cgroup_attach_task(self, controllers = ['schedtune', 'cpuset']): 573 # Since fork doesn't result in attach events, generate fake attach events 574 # The below mechanism doesn't work to propogate nested fork levels: 575 # For ex: 576 # cgroup_attach_task: pid=1166 577 # fork: pid=1166 child_pid=2222 <-- fake attach generated 578 # fork: pid=2222 child_pid=3333 <-- fake attach not generated 579 def fork_add_cgroup(fdf, cdf, controller): 580 cdf = cdf[cdf['controller'] == controller] 581 ret_df = trappy.utils.merge_dfs(fdf, cdf, pivot='pid') 582 return ret_df 583 584 if not 'sched_process_fork' in self.available_events: 585 self._log.error('sched_process_fork is mandatory to get proper cgroup_attach events') 586 return None 587 fdf = self._dfg_trace_event('sched_process_fork') 588 589 forks_len = len(fdf) 590 forkdf = fdf 591 cdf = self._cgroup_attach_task() 592 for idx, c in enumerate(controllers): 593 fdf = fork_add_cgroup(fdf, cdf, c) 594 if (idx != (len(controllers) - 1)): 595 fdf = pd.concat([fdf, forkdf]).sort_values(by='__line') 596 597 fdf = fdf[['__line', 'child_pid', 'controller', 'cgroup']] 598 fdf.rename(columns = { 'child_pid': 'pid' }, inplace=True) 599 600 # Always drop na since this DF is used as secondary 601 fdf.dropna(inplace=True, how='any') 602 603 new_forks_len = len(fdf) / len(controllers) 604 605 fdf = pd.concat([fdf, cdf]).sort_values(by='__line') 606 607 if new_forks_len < forks_len: 608 dropped = forks_len - new_forks_len 609 self._log.info("Couldn't attach all forks cgroup with-attach events ({} dropped)".format(dropped)) 610 return fdf 611 612 @memoized 613 def _dfg_sched_switch_cgroup(self, controllers = ['schedtune', 'cpuset']): 614 def sched_switch_add_cgroup(sdf, cdf, controller, direction): 615 cdf = cdf[cdf['controller'] == controller] 616 617 ret_df = sdf.rename(columns = { direction + '_pid': 'pid' }) 618 ret_df = trappy.utils.merge_dfs(ret_df, cdf, pivot='pid') 619 ret_df.rename(columns = { 'pid': direction + '_pid' }, inplace=True) 620 621 ret_df.drop('controller', axis=1, inplace=True) 622 ret_df.rename(columns = { 'cgroup': direction + '_' + controller }, inplace=True) 623 return ret_df 624 625 if not 'sched_switch' in self.available_events: 626 self._log.error('sched_switch is mandatory to generate sched_switch_cgroup event') 627 return None 628 sdf = self._dfg_trace_event('sched_switch') 629 cdf = self._dfg_cgroup_attach_task() 630 631 for c in controllers: 632 sdf = sched_switch_add_cgroup(sdf, cdf, c, 'next') 633 sdf = sched_switch_add_cgroup(sdf, cdf, c, 'prev') 634 635 # Augment with TGID information 636 sdf = sdf.join(self._pid_tgid, on='next_pid').rename(columns = {'tgid': 'next_tgid'}) 637 sdf = sdf.join(self._pid_tgid, on='prev_pid').rename(columns = {'tgid': 'prev_tgid'}) 638 639 df = self._tasks_by_pid.rename(columns = { 'next_comm': 'comm' }) 640 sdf = sdf.join(df, on='next_tgid').rename(columns = {'comm': 'next_tgid_comm'}) 641 sdf = sdf.join(df, on='prev_tgid').rename(columns = {'comm': 'prev_tgid_comm'}) 642 return sdf 643 644 ############################################################################### 645 # Trace Events Sanitize Methods 646 ############################################################################### 647 648 def _sanitize_SchedCpuCapacity(self): 649 """ 650 Add more columns to cpu_capacity data frame if the energy model is 651 available. 652 """ 653 if not self.hasEvents('cpu_capacity') \ 654 or 'nrg_model' not in self.platform: 655 return 656 657 df = self._dfg_trace_event('cpu_capacity') 658 659 # Add column with LITTLE and big CPUs max capacities 660 nrg_model = self.platform['nrg_model'] 661 max_lcap = nrg_model['little']['cpu']['cap_max'] 662 max_bcap = nrg_model['big']['cpu']['cap_max'] 663 df['max_capacity'] = np.select( 664 [df.cpu.isin(self.platform['clusters']['little'])], 665 [max_lcap], max_bcap) 666 # Add LITTLE and big CPUs "tipping point" threshold 667 tip_lcap = 0.8 * max_lcap 668 tip_bcap = 0.8 * max_bcap 669 df['tip_capacity'] = np.select( 670 [df.cpu.isin(self.platform['clusters']['little'])], 671 [tip_lcap], tip_bcap) 672 673 def _sanitize_SchedLoadAvgCpu(self): 674 """ 675 If necessary, rename certain signal names from v5.0 to v5.1 format. 676 """ 677 if not self.hasEvents('sched_load_avg_cpu'): 678 return 679 df = self._dfg_trace_event('sched_load_avg_cpu') 680 if 'utilization' in df: 681 df.rename(columns={'utilization': 'util_avg'}, inplace=True) 682 df.rename(columns={'load': 'load_avg'}, inplace=True) 683 684 def _sanitize_SchedLoadAvgTask(self): 685 """ 686 If necessary, rename certain signal names from v5.0 to v5.1 format. 687 """ 688 if not self.hasEvents('sched_load_avg_task'): 689 return 690 df = self._dfg_trace_event('sched_load_avg_task') 691 if 'utilization' in df: 692 df.rename(columns={'utilization': 'util_avg'}, inplace=True) 693 df.rename(columns={'load': 'load_avg'}, inplace=True) 694 df.rename(columns={'avg_period': 'period_contrib'}, inplace=True) 695 df.rename(columns={'runnable_avg_sum': 'load_sum'}, inplace=True) 696 df.rename(columns={'running_avg_sum': 'util_sum'}, inplace=True) 697 df['cluster'] = np.select( 698 [df.cpu.isin(self.platform['clusters']['little'])], 699 ['LITTLE'], 'big') 700 # Add a column which represents the max capacity of the smallest 701 # clustre which can accomodate the task utilization 702 little_cap = self.platform['nrg_model']['little']['cpu']['cap_max'] 703 big_cap = self.platform['nrg_model']['big']['cpu']['cap_max'] 704 df['min_cluster_cap'] = df.util_avg.map( 705 lambda util_avg: big_cap if util_avg > little_cap else little_cap 706 ) 707 708 def _sanitize_SchedBoostCpu(self): 709 """ 710 Add a boosted utilization signal as the sum of utilization and margin. 711 712 Also, if necessary, rename certain signal names from v5.0 to v5.1 713 format. 714 """ 715 if not self.hasEvents('sched_boost_cpu'): 716 return 717 df = self._dfg_trace_event('sched_boost_cpu') 718 if 'usage' in df: 719 df.rename(columns={'usage': 'util'}, inplace=True) 720 df['boosted_util'] = df['util'] + df['margin'] 721 722 def _sanitize_SchedBoostTask(self): 723 """ 724 Add a boosted utilization signal as the sum of utilization and margin. 725 726 Also, if necessary, rename certain signal names from v5.0 to v5.1 727 format. 728 """ 729 if not self.hasEvents('sched_boost_task'): 730 return 731 df = self._dfg_trace_event('sched_boost_task') 732 if 'utilization' in df: 733 # Convert signals name from to v5.1 format 734 df.rename(columns={'utilization': 'util'}, inplace=True) 735 df['boosted_util'] = df['util'] + df['margin'] 736 737 def _sanitize_SchedEnergyDiff(self): 738 """ 739 If a energy model is provided, some signals are added to the 740 sched_energy_diff trace event data frame. 741 742 Also convert between existing field name formats for sched_energy_diff 743 """ 744 if not self.hasEvents('sched_energy_diff') \ 745 or 'nrg_model' not in self.platform: 746 return 747 nrg_model = self.platform['nrg_model'] 748 em_lcluster = nrg_model['little']['cluster'] 749 em_bcluster = nrg_model['big']['cluster'] 750 em_lcpu = nrg_model['little']['cpu'] 751 em_bcpu = nrg_model['big']['cpu'] 752 lcpus = len(self.platform['clusters']['little']) 753 bcpus = len(self.platform['clusters']['big']) 754 SCHED_LOAD_SCALE = 1024 755 756 power_max = em_lcpu['nrg_max'] * lcpus + em_bcpu['nrg_max'] * bcpus + \ 757 em_lcluster['nrg_max'] + em_bcluster['nrg_max'] 758 self._log.debug( 759 "Maximum estimated system energy: {0:d}".format(power_max)) 760 761 df = self._dfg_trace_event('sched_energy_diff') 762 763 translations = {'nrg_d' : 'nrg_diff', 764 'utl_d' : 'usage_delta', 765 'payoff' : 'nrg_payoff' 766 } 767 df.rename(columns=translations, inplace=True) 768 769 df['nrg_diff_pct'] = SCHED_LOAD_SCALE * df.nrg_diff / power_max 770 771 # Tag columns by usage_delta 772 ccol = df.usage_delta 773 df['usage_delta_group'] = np.select( 774 [ccol < 150, ccol < 400, ccol < 600], 775 ['< 150', '< 400', '< 600'], '>= 600') 776 777 # Tag columns by nrg_payoff 778 ccol = df.nrg_payoff 779 df['nrg_payoff_group'] = np.select( 780 [ccol > 2e9, ccol > 0, ccol > -2e9], 781 ['Optimal Accept', 'SchedTune Accept', 'SchedTune Reject'], 782 'Suboptimal Reject') 783 784 def _sanitize_SchedOverutilized(self): 785 """ Add a column with overutilized status duration. """ 786 if not self.hasEvents('sched_overutilized'): 787 return 788 df = self._dfg_trace_event('sched_overutilized') 789 df['start'] = df.index 790 df['len'] = (df.start - df.start.shift()).fillna(0).shift(-1) 791 df.drop('start', axis=1, inplace=True) 792 793 # Sanitize cgroup information helper 794 def _helper_sanitize_CgroupAttachTask(self, df, allowed_cgroups, controller_id_name): 795 # Drop rows that aren't in the root-id -> name map 796 df = df[df['dst_root'].isin(controller_id_name.keys())] 797 798 def get_cgroup_name(path, valid_names): 799 name = os.path.basename(path) 800 name = 'root' if not name in valid_names else name 801 return name 802 803 def get_cgroup_names(rows): 804 ret = [] 805 for r in rows.iterrows(): 806 ret.append(get_cgroup_name(r[1]['dst_path'], allowed_cgroups)) 807 return ret 808 809 def get_controller_names(rows): 810 ret = [] 811 for r in rows.iterrows(): 812 ret.append(controller_id_name[r[1]['dst_root']]) 813 return ret 814 815 # Sanitize cgroup names 816 # cgroup column isn't in mainline, add it in 817 # its already added for some out of tree kernels so check first 818 if not 'cgroup' in df.columns: 819 if not 'dst_path' in df.columns: 820 raise RuntimeError('Cant santize cgroup DF, need dst_path') 821 df = df.assign(cgroup = get_cgroup_names) 822 823 # Sanitize controller names 824 if not 'controller' in df.columns: 825 if not 'dst_root' in df.columns: 826 raise RuntimeError('Cant santize cgroup DF, need dst_path') 827 df = df.assign(controller = get_controller_names) 828 829 return df 830 831 def _sanitize_CgroupAttachTask(self): 832 def sanitize_cgroup_event(name): 833 if not name in self.available_events: 834 return 835 836 df = self._dfg_trace_event(name) 837 838 if len(df.groupby(level=0).filter(lambda x: len(x) > 1)) > 0: 839 self._log.warning('Timstamp Collisions seen in {} event!'.format(name)) 840 841 df = self._helper_sanitize_CgroupAttachTask(df, self.cgroup_info['cgroups'], 842 self.cgroup_info['controller_ids']) 843 getattr(self.ftrace, name).data_frame = df 844 sanitize_cgroup_event('cgroup_attach_task') 845 sanitize_cgroup_event('cgroup_attach_task_devlib') 846 847 def _chunker(self, seq, size): 848 """ 849 Given a data frame or a series, generate a sequence of chunks of the 850 given size. 851 852 :param seq: data to be split into chunks 853 :type seq: :mod:`pandas.Series` or :mod:`pandas.DataFrame` 854 855 :param size: size of each chunk 856 :type size: int 857 """ 858 return (seq.iloc[pos:pos + size] for pos in range(0, len(seq), size)) 859 860 def _sanitize_CpuFrequency(self): 861 """ 862 Verify that all platform reported clusters are frequency coherent (i.e. 863 frequency scaling is performed at a cluster level). 864 """ 865 if not self.hasEvents('cpu_frequency_devlib'): 866 return 867 868 devlib_freq = self._dfg_trace_event('cpu_frequency_devlib') 869 devlib_freq.rename(columns={'cpu_id':'cpu'}, inplace=True) 870 devlib_freq.rename(columns={'state':'frequency'}, inplace=True) 871 872 df = self._dfg_trace_event('cpu_frequency') 873 clusters = self.platform['clusters'] 874 875 # devlib always introduces fake cpu_frequency events, in case the 876 # OS has not generated cpu_frequency envets there are the only 877 # frequency events to report 878 if len(df) == 0: 879 # Register devlib injected events as 'cpu_frequency' events 880 setattr(self.ftrace.cpu_frequency, 'data_frame', devlib_freq) 881 df = devlib_freq 882 self.available_events.append('cpu_frequency') 883 884 # make sure fake cpu_frequency events are never interleaved with 885 # OS generated events 886 else: 887 if len(devlib_freq) > 0: 888 889 # Frequencies injection is done in a per-cluster based. 890 # This is based on the assumption that clusters are 891 # frequency choerent. 892 # For each cluster we inject devlib events only if 893 # these events does not overlaps with os-generated ones. 894 895 # Inject "initial" devlib frequencies 896 os_df = df 897 dl_df = devlib_freq.iloc[:self.platform['cpus_count']] 898 for _,c in self.platform['clusters'].iteritems(): 899 dl_freqs = dl_df[dl_df.cpu.isin(c)] 900 os_freqs = os_df[os_df.cpu.isin(c)] 901 self._log.debug("First freqs for %s:\n%s", c, dl_freqs) 902 # All devlib events "before" os-generated events 903 self._log.debug("Min os freq @: %s", os_freqs.index.min()) 904 if os_freqs.empty or \ 905 os_freqs.index.min() > dl_freqs.index.max(): 906 self._log.debug("Insert devlib freqs for %s", c) 907 df = pd.concat([dl_freqs, df]) 908 909 # Inject "final" devlib frequencies 910 os_df = df 911 dl_df = devlib_freq.iloc[self.platform['cpus_count']:] 912 for _,c in self.platform['clusters'].iteritems(): 913 dl_freqs = dl_df[dl_df.cpu.isin(c)] 914 os_freqs = os_df[os_df.cpu.isin(c)] 915 self._log.debug("Last freqs for %s:\n%s", c, dl_freqs) 916 # All devlib events "after" os-generated events 917 self._log.debug("Max os freq @: %s", os_freqs.index.max()) 918 if os_freqs.empty or \ 919 os_freqs.index.max() < dl_freqs.index.min(): 920 self._log.debug("Append devlib freqs for %s", c) 921 df = pd.concat([df, dl_freqs]) 922 923 df.sort_index(inplace=True) 924 925 setattr(self.ftrace.cpu_frequency, 'data_frame', df) 926 927 # Frequency Coherency Check 928 for _, cpus in clusters.iteritems(): 929 cluster_df = df[df.cpu.isin(cpus)] 930 for chunk in self._chunker(cluster_df, len(cpus)): 931 f = chunk.iloc[0].frequency 932 if any(chunk.frequency != f): 933 self._log.warning('Cluster Frequency is not coherent! ' 934 'Failure in [cpu_frequency] events at:') 935 self._log.warning(chunk) 936 self.freq_coherency = False 937 return 938 self._log.info('Platform clusters verified to be Frequency coherent') 939 940 ############################################################################### 941 # Utility Methods 942 ############################################################################### 943 944 def integrate_square_wave(self, sq_wave): 945 """ 946 Compute the integral of a square wave time series. 947 948 :param sq_wave: square wave assuming only 1.0 and 0.0 values 949 :type sq_wave: :mod:`pandas.Series` 950 """ 951 sq_wave.iloc[-1] = 0.0 952 # Compact signal to obtain only 1-0-1-0 sequences 953 comp_sig = sq_wave.loc[sq_wave.shift() != sq_wave] 954 # First value for computing the difference must be a 1 955 if comp_sig.iloc[0] == 0.0: 956 return sum(comp_sig.iloc[2::2].index - comp_sig.iloc[1:-1:2].index) 957 else: 958 return sum(comp_sig.iloc[1::2].index - comp_sig.iloc[:-1:2].index) 959 960 def _loadFunctionsStats(self, path='trace.stats'): 961 """ 962 Read functions profiling file and build a data frame containing all 963 relevant data. 964 965 :param path: path to the functions profiling trace file 966 :type path: str 967 """ 968 if os.path.isdir(path): 969 path = os.path.join(path, 'trace.stats') 970 if path.endswith('dat') or path.endswith('html'): 971 pre, ext = os.path.splitext(path) 972 path = pre + '.stats' 973 if not os.path.isfile(path): 974 return False 975 976 # Opening functions profiling JSON data file 977 self._log.debug('Loading functions profiling data from [%s]...', path) 978 with open(os.path.join(path), 'r') as fh: 979 trace_stats = json.load(fh) 980 981 # Build DataFrame of function stats 982 frames = {} 983 for cpu, data in trace_stats.iteritems(): 984 frames[int(cpu)] = pd.DataFrame.from_dict(data, orient='index') 985 986 # Build and keep track of the DataFrame 987 self._functions_stats_df = pd.concat(frames.values(), 988 keys=frames.keys()) 989 990 return len(self._functions_stats_df) > 0 991 992 @memoized 993 def getCPUActiveSignal(self, cpu): 994 """ 995 Build a square wave representing the active (i.e. non-idle) CPU time, 996 i.e.: 997 998 cpu_active[t] == 1 if the CPU is reported to be non-idle by cpuidle at 999 time t 1000 cpu_active[t] == 0 otherwise 1001 1002 :param cpu: CPU ID 1003 :type cpu: int 1004 1005 :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no 1006 "cpu_idle" events 1007 """ 1008 if not self.hasEvents('cpu_idle'): 1009 self._log.warning('Events [cpu_idle] not found, ' 1010 'cannot compute CPU active signal!') 1011 return None 1012 1013 idle_df = self._dfg_trace_event('cpu_idle') 1014 cpu_df = idle_df[idle_df.cpu_id == cpu] 1015 1016 cpu_active = cpu_df.state.apply( 1017 lambda s: 1 if s == NON_IDLE_STATE else 0 1018 ) 1019 1020 start_time = 0.0 1021 if not self.ftrace.normalized_time: 1022 start_time = self.ftrace.basetime 1023 1024 if cpu_active.empty: 1025 cpu_active = pd.Series([0], index=[start_time]) 1026 elif cpu_active.index[0] != start_time: 1027 entry_0 = pd.Series(cpu_active.iloc[0] ^ 1, index=[start_time]) 1028 cpu_active = pd.concat([entry_0, cpu_active]) 1029 1030 # Fix sequences of wakeup/sleep events reported with the same index 1031 return handle_duplicate_index(cpu_active) 1032 1033 1034 @memoized 1035 def getClusterActiveSignal(self, cluster): 1036 """ 1037 Build a square wave representing the active (i.e. non-idle) cluster 1038 time, i.e.: 1039 1040 cluster_active[t] == 1 if at least one CPU is reported to be non-idle 1041 by CPUFreq at time t 1042 cluster_active[t] == 0 otherwise 1043 1044 :param cluster: list of CPU IDs belonging to a cluster 1045 :type cluster: list(int) 1046 1047 :returns: A :mod:`pandas.Series` or ``None`` if the trace contains no 1048 "cpu_idle" events 1049 """ 1050 if not self.hasEvents('cpu_idle'): 1051 self._log.warning('Events [cpu_idle] not found, ' 1052 'cannot compute cluster active signal!') 1053 return None 1054 1055 active = self.getCPUActiveSignal(cluster[0]).to_frame(name=cluster[0]) 1056 for cpu in cluster[1:]: 1057 active = active.join( 1058 self.getCPUActiveSignal(cpu).to_frame(name=cpu), 1059 how='outer' 1060 ) 1061 1062 active.fillna(method='ffill', inplace=True) 1063 1064 # Cluster active is the OR between the actives on each CPU 1065 # belonging to that specific cluster 1066 cluster_active = reduce( 1067 operator.or_, 1068 [cpu_active.astype(int) for _, cpu_active in 1069 active.iteritems()] 1070 ) 1071 1072 return cluster_active 1073 1074 1075 class TraceData: 1076 """ A DataFrame collector exposed to Trace's clients """ 1077 pass 1078 1079 # vim :set tabstop=4 shiftwidth=4 expandtab 1080