1 # Copyright 2015-2016 ARM Limited 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # 15 16 """Scheduler specific Functionality for the 17 stats framework 18 19 The Scheduler stats aggregation is based on a signal 20 which is generated by the combination of two triggers 21 from the events with the following parameters 22 23 ========================= ============ ============= 24 EVENT VALUE FILTERS 25 ========================= ============ ============= 26 :func:`sched_switch` 1 next_pid 27 :func:`sched_switch` -1 prev_pid 28 ========================= ============ ============= 29 30 Both these Triggers are provided by the event 31 :mod:`trappy.sched.SchedSwitch` which correspond to 32 the :code:`sched_switch` unique word in the trace 33 34 .. seealso:: :mod:`trappy.stats.Trigger.Trigger` 35 36 Using the above information the following signals are 37 generated. 38 39 **EVENT SERIES** 40 41 This is a combination of the two triggers as specified 42 above and has alternating +/- 1 values and is merely 43 a representation of the position in time when the process 44 started or stopped running on a CPU 45 46 **RESIDENCY SERIES** 47 48 This series is a cumulative sum of the event series and 49 is a representation of the continuous residency of the 50 process on a CPU 51 52 The pivot for the aggregators is the CPU on which the 53 event occurred on. If N is the number of CPUs in the 54 system, N signal for each CPU are generated. These signals 55 can then be aggregated by specifying a Topology 56 57 .. seealso:: :mod:`trappy.stats.Topology.Topology` 58 """ 59 60 import numpy as np 61 from trappy.stats.Trigger import Trigger 62 63 WINDOW_SIZE = 0.0001 64 """A control config for filter events. Some analyses 65 may require ignoring of small interruptions""" 66 67 # Trigger Values 68 SCHED_SWITCH_IN = 1 69 """Value of the event when a task is **switch in** 70 or scheduled on a CPU""" 71 SCHED_SWITCH_OUT = -1 72 """Value of the event when a task is **switched out** 73 or relinquishes a CPU""" 74 NO_EVENT = 0 75 """Signifies no event on an event trace""" 76 77 # Field Names 78 CPU_FIELD = "__cpu" 79 """The column in the sched_switch event that 80 indicates the CPU on which the event occurred 81 """ 82 NEXT_PID_FIELD = "next_pid" 83 """The column in the sched_switch event that 84 indicates the PID of the next process to be scheduled 85 """ 86 PREV_PID_FIELD = "prev_pid" 87 """The column in the sched_switch event that 88 indicates the PID of the process that was scheduled 89 in 90 """ 91 TASK_RUNNING = 1 92 """The column in the sched_switch event that 93 indicates the CPU on which the event occurred 94 """ 95 TASK_NOT_RUNNING = 0 96 """In a residency series, a zero indicates 97 that the task is not running 98 """ 99 TIME_INVAL = -1 100 """Standard Value to indicate invalid time data""" 101 SERIES_SANTIZED = "_sched_sanitized" 102 """A memoized flag which is set when an event series 103 is checked for boundary conditions 104 """ 105 106 107 def sanitize_asymmetry(series, window=None): 108 """Sanitize the cases when a :code:`SWITCH_OUT` 109 happens before a :code:`SWITCH_IN`. (The case when 110 a process is already running before the trace started) 111 112 :param series: Input Time Series data 113 :type series: :mod:`pandas.Series` 114 115 :param window: A tuple indicating a time window 116 :type window: tuple 117 """ 118 119 if not hasattr(series, SERIES_SANTIZED): 120 121 events = series[series != 0] 122 if len(series) >= 2 and len(events): 123 if series.values[0] == SCHED_SWITCH_OUT: 124 series.values[0] = TASK_NOT_RUNNING 125 126 elif events.values[0] == SCHED_SWITCH_OUT: 127 series.values[0] = SCHED_SWITCH_IN 128 if window: 129 series.index.values[0] = window[0] 130 131 if series.values[-1] == SCHED_SWITCH_IN: 132 series.values[-1] = TASK_NOT_RUNNING 133 134 elif events.values[-1] == SCHED_SWITCH_IN: 135 series.values[-1] = SCHED_SWITCH_OUT 136 if window: 137 series.index.values[-1] = window[1] 138 139 # No point if the series just has one value and 140 # one event. We do not have sufficient data points 141 # for any calculation. We should Ideally never reach 142 # here. 143 elif len(series) == 1: 144 series.values[0] = 0 145 146 setattr(series, SERIES_SANTIZED, True) 147 148 return series 149 150 151 def csum(series, window=None, filter_gaps=False): 152 """:func:`aggfunc` for the cumulative sum of the 153 input series data 154 155 :param series: Input Time Series data 156 :type series: :mod:`pandas.Series` 157 158 :param window: A tuple indicating a time window 159 :type window: tuple 160 161 :param filter_gaps: If set, a process being switched out 162 for :mod:`bart.sched.functions.WINDOW_SIZE` is 163 ignored. This is helpful when small interruptions need 164 to be ignored to compare overall correlation 165 :type filter_gaps: bool 166 """ 167 168 if filter_gaps: 169 series = filter_small_gaps(series) 170 171 series = series.cumsum() 172 return select_window(series, window) 173 174 def filter_small_gaps(series): 175 """A helper function that does filtering of gaps 176 in residency series < :mod:`bart.sched.functions.WINDOW_SIZE` 177 178 :param series: Input Time Series data 179 :type series: :mod:`pandas.Series` 180 """ 181 182 start = None 183 for index, value in series.iteritems(): 184 185 if value == SCHED_SWITCH_IN: 186 if start == None: 187 continue 188 189 if index - start < WINDOW_SIZE: 190 series[start] = NO_EVENT 191 series[index] = NO_EVENT 192 start = None 193 194 if value == SCHED_SWITCH_OUT: 195 start = index 196 197 return series 198 199 def first_cpu(series, window=None): 200 """:func:`aggfunc` to calculate the time of 201 the first switch in event in the series 202 This is returned as a vector of unit length 203 so that it can be aggregated and reduced across 204 nodes to find the first cpu of a task 205 206 :param series: Input Time Series data 207 :type series: :mod:`pandas.Series` 208 209 :param window: A tuple indicating a time window 210 :type window: tuple 211 """ 212 series = select_window(series, window) 213 series = series[series == SCHED_SWITCH_IN] 214 if len(series): 215 return [series.index.values[0]] 216 else: 217 return [float("inf")] 218 219 def last_cpu(series, window=None): 220 """:func:`aggfunc` to calculate the time of 221 the last switch out event in the series 222 This is returned as a vector of unit length 223 so that it can be aggregated and reduced across 224 nodes to find the last cpu of a task 225 226 :param series: Input Time Series data 227 :type series: :mod:`pandas.Series` 228 229 :param window: A tuple indicating a time window 230 :type window: tuple 231 """ 232 series = select_window(series, window) 233 series = series[series == SCHED_SWITCH_OUT] 234 235 if len(series): 236 return [series.index.values[-1]] 237 else: 238 return [0] 239 240 def select_window(series, window): 241 """Helper Function to select a portion of 242 pandas time series 243 244 :param series: Input Time Series data 245 :type series: :mod:`pandas.Series` 246 247 :param window: A tuple indicating a time window 248 :type window: tuple 249 """ 250 251 if not window: 252 return series 253 254 start, stop = window 255 ix = series.index 256 selector = ((ix >= start) & (ix <= stop)) 257 window_series = series[selector] 258 return window_series 259 260 def residency_sum(series, window=None): 261 """:func:`aggfunc` to calculate the total 262 residency 263 264 265 The input series is processed for 266 intervals between a :mod:`bart.sched.functions.SCHED_SWITCH_OUT` 267 and :mod:`bart.sched.functions.SCHED_SWITCH_IN` to track 268 additive residency of a task 269 270 .. math:: 271 272 S_{in} = i_{1}, i_{2}...i_{N} \\\\ 273 S_{out} = o_{1}, o_{2}...o_{N} \\\\ 274 R_{total} = \sum_{k}^{N}\Delta_k = \sum_{k}^{N}(o_{k} - i_{k}) 275 276 :param series: Input Time Series data 277 :type series: :mod:`pandas.Series` 278 279 :param window: A tuple indicating a time window 280 :type window: tuple 281 282 :return: A scalar float value 283 """ 284 285 if not len(series): 286 return 0.0 287 288 org_series = series 289 series = select_window(series, window) 290 series = sanitize_asymmetry(series, window) 291 292 s_in = series[series == SCHED_SWITCH_IN] 293 s_out = series[series == SCHED_SWITCH_OUT] 294 295 if not (len(s_in) and len(s_out)): 296 try: 297 org_series = sanitize_asymmetry(org_series) 298 running = select_window(org_series.cumsum(), window) 299 if running.values[0] == TASK_RUNNING and running.values[-1] == TASK_RUNNING: 300 return window[1] - window[0] 301 except Exception,e: 302 pass 303 304 if len(s_in) != len(s_out): 305 raise RuntimeError( 306 "Unexpected Lengths: s_in={}, s_out={}".format( 307 len(s_in), 308 len(s_out))) 309 else: 310 return np.sum(s_out.index.values - s_in.index.values) 311 312 313 def first_time(series, value, window=None): 314 """:func:`aggfunc` to: 315 316 - Return the first index where the 317 series == value 318 319 - If no such index is found 320 +inf is returned 321 322 :param series: Input Time Series data 323 :type series: :mod:`pandas.Series` 324 325 :param window: A tuple indicating a time window 326 :type window: tuple 327 328 :return: A vector of Unit Length 329 """ 330 331 series = select_window(series, window) 332 series = series[series == value] 333 334 if not len(series): 335 return [float("inf")] 336 337 return [series.index.values[0]] 338 339 340 def period(series, align="start", window=None): 341 """This :func:`aggfunc` returns a tuple 342 of the average duration between two triggers: 343 344 - When :code:`align=start` the :code:`SCHED_IN` 345 trigger is used 346 347 - When :code:`align=end` the :code:`SCHED_OUT` 348 trigger is used 349 350 351 .. math:: 352 353 E = e_{1}, e_{2}...e_{N} \\\\ 354 T_p = \\frac{\sum_{j}^{\lfloor N/2 \\rfloor}(e_{2j + 1} - e_{2j})}{N} 355 356 :param series: Input Time Series data 357 :type series: :mod:`pandas.Series` 358 359 :param window: A tuple indicating a time window 360 :type window: tuple 361 362 :return: 363 A list of deltas of successive starts/stops 364 of a task 365 366 """ 367 368 series = select_window(series, window) 369 series = sanitize_asymmetry(series, window) 370 371 if align == "start": 372 series = series[series == SCHED_SWITCH_IN] 373 elif align == "end": 374 series = series[series == SCHED_SWITCH_OUT] 375 376 if len(series) % 2 == 0: 377 series = series[:1] 378 379 if not len(series): 380 return [] 381 382 return list(np.diff(series.index.values)) 383 384 def last_time(series, value, window=None): 385 """:func:`aggfunc` to: 386 387 - The first index where the 388 series == value 389 390 - If no such index is found 391 :mod:`bart.sched.functions.TIME_INVAL` 392 is returned 393 394 :param series: Input Time Series data 395 :type series: :mod:`pandas.Series` 396 397 :param window: A tuple indicating a time window 398 :type window: tuple 399 400 :return: A vector of Unit Length 401 """ 402 403 series = select_window(series, window) 404 series = series[series == value] 405 if not len(series): 406 return [TIME_INVAL] 407 408 return [series.index.values[-1]] 409 410 411 def binary_correlate(series_x, series_y): 412 """Helper function to Correlate binary Data 413 414 Both the series should have same indices 415 416 For binary time series data: 417 418 .. math:: 419 420 \\alpha_{corr} = \\frac{N_{agree} - N_{disagree}}{N} 421 422 :param series_x: First time Series data 423 :type series_x: :mod:`pandas.Series` 424 425 :param series_y: Second time Series data 426 :type series_y: :mod:`pandas.Series` 427 """ 428 429 if len(series_x) != len(series_y): 430 raise ValueError("Cannot compute binary correlation for \ 431 unequal vectors") 432 433 agree = len(series_x[series_x == series_y]) 434 disagree = len(series_x[series_x != series_y]) 435 436 return (agree - disagree) / float(len(series_x)) 437 438 def get_pids_for_process(ftrace, execname, cls=None): 439 """Get the PIDs for a given process 440 441 :param ftrace: A ftrace object with a sched_switch 442 event 443 :type ftrace: :mod:`trappy.ftrace.FTrace` 444 445 :param execname: The name of the process 446 :type execname: str 447 448 :param cls: The SchedSwitch event class (required if 449 a different event is to be used) 450 :type cls: :mod:`trappy.base.Base` 451 452 :return: The set of PIDs for the execname 453 """ 454 455 if not cls: 456 try: 457 df = ftrace.sched_switch.data_frame 458 except AttributeError: 459 raise ValueError("SchedSwitch event not found in ftrace") 460 461 if len(df) == 0: 462 raise ValueError("SchedSwitch event not found in ftrace") 463 else: 464 event = getattr(ftrace, cls.name) 465 df = event.data_frame 466 467 mask = df["next_comm"].apply(lambda x : True if x == execname else False) 468 return list(np.unique(df[mask]["next_pid"].values)) 469 470 def get_task_name(ftrace, pid, cls=None): 471 """Returns the execname for pid 472 473 :param ftrace: A ftrace object with a sched_switch 474 event 475 :type ftrace: :mod:`trappy.ftrace.FTrace` 476 477 :param pid: The PID of the process 478 :type pid: int 479 480 :param cls: The SchedSwitch event class (required if 481 a different event is to be used) 482 :type cls: :mod:`trappy.base.Base` 483 484 :return: The execname for the PID 485 """ 486 487 if not cls: 488 try: 489 df = ftrace.sched_switch.data_frame 490 except AttributeError: 491 raise ValueError("SchedSwitch event not found in ftrace") 492 else: 493 event = getattr(ftrace, cls.name) 494 df = event.data_frame 495 496 df = df[df["next_pid"] == pid] 497 if not len(df): 498 return "" 499 else: 500 return df["next_comm"].values[0] 501 502 def sched_triggers(ftrace, pid, sched_switch_class): 503 """Returns the list of sched_switch triggers 504 505 :param ftrace: A ftrace object with a sched_switch 506 event 507 :type ftrace: :mod:`trappy.ftrace.FTrace` 508 509 :param pid: The PID of the associated process 510 :type pid: int 511 512 :param sched_switch_class: The SchedSwitch event class 513 :type sched_switch_class: :mod:`trappy.base.Base` 514 515 :return: List of triggers, such that 516 :: 517 518 triggers[0] = switch_in_trigger 519 triggers[1] = switch_out_trigger 520 """ 521 522 if not hasattr(ftrace, "sched_switch"): 523 raise ValueError("SchedSwitch event not found in ftrace") 524 525 triggers = [] 526 triggers.append(sched_switch_in_trigger(ftrace, pid, sched_switch_class)) 527 triggers.append(sched_switch_out_trigger(ftrace, pid, sched_switch_class)) 528 return triggers 529 530 def sched_switch_in_trigger(ftrace, pid, sched_switch_class): 531 """ 532 :param ftrace: A ftrace object with a sched_switch 533 event 534 :type ftrace: :mod:`trappy.ftrace.FTrace` 535 536 :param pid: The PID of the associated process 537 :type pid: int 538 539 :param sched_switch_class: The SchedSwitch event class 540 :type sched_switch_class: :mod:`trappy.base.Base` 541 542 :return: :mod:`trappy.stats.Trigger.Trigger` on 543 the SchedSwitch: IN for the given PID 544 """ 545 546 task_in = {} 547 task_in[NEXT_PID_FIELD] = pid 548 549 return Trigger(ftrace, 550 sched_switch_class, # trappy Event Class 551 task_in, # Filter Dictionary 552 SCHED_SWITCH_IN, # Trigger Value 553 CPU_FIELD) # Primary Pivot 554 555 def sched_switch_out_trigger(ftrace, pid, sched_switch_class): 556 """ 557 :param ftrace: A ftrace object with a sched_switch 558 event 559 :type ftrace: :mod:`trappy.ftrace.FTrace` 560 561 :param pid: The PID of the associated process 562 :type pid: int 563 564 :param sched_switch_class: The SchedSwitch event class 565 :type sched_switch_class: :mod:`trappy.base.Base` 566 567 :return: :mod:`trappy.stats.Trigger.Trigger` on 568 the SchedSwitch: OUT for the given PID 569 """ 570 571 task_out = {} 572 task_out[PREV_PID_FIELD] = pid 573 574 return Trigger(ftrace, 575 sched_switch_class, # trappy Event Class 576 task_out, # Filter Dictionary 577 SCHED_SWITCH_OUT, # Trigger Value 578 CPU_FIELD) # Primary Pivot 579 580 581 def trace_event(series, window=None): 582 """ 583 :func:`aggfunc` to be used for plotting 584 the process residency data using 585 :mod:`trappy.plotter.EventPlot` 586 587 :param series: Input Time Series data 588 :type series: :mod:`pandas.Series` 589 590 :param window: A tuple indicating a time window 591 :type window: tuple 592 593 :return: A list of events 594 of the type: 595 :: 596 597 [ 598 [start_time_1, stop_time_1], 599 [start_time_2, stop_time_2], 600 # 601 # 602 [start_time_N, stop_time_N], 603 ] 604 """ 605 rects = [] 606 series = select_window(series, window) 607 series = sanitize_asymmetry(series, window) 608 609 s_in = series[series == SCHED_SWITCH_IN] 610 s_out = series[series == SCHED_SWITCH_OUT] 611 612 if not len(s_in): 613 return rects 614 615 if len(s_in) != len(s_out): 616 raise RuntimeError( 617 "Unexpected Lengths: s_in={}, s_out={}".format( 618 len(s_in), 619 len(s_out))) 620 621 return np.column_stack((s_in.index.values, s_out.index.values)) 622