Home | History | Annotate | Download | only in sched
      1 #    Copyright 2015-2016 ARM Limited
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 #
     15 
     16 """
     17 :mod:`bart.sched.SchedAssert` provides ability to assert scheduler behaviour.
     18 The analysis is based on TRAPpy's statistics framework and is potent enough
     19 to aggregate statistics over processor hierarchies.
     20 """
     21 
     22 import trappy
     23 import itertools
     24 import math
     25 from trappy.stats.Aggregator import MultiTriggerAggregator
     26 from bart.sched import functions as sched_funcs
     27 from bart.common import Utils
     28 import numpy as np
     29 
     30 # pylint: disable=invalid-name
     31 # pylint: disable=too-many-arguments
     32 class SchedAssert(object):
     33 
     34     """The primary focus of this class is to assert and verify
     35     predefined scheduler scenarios. This does not compare parameters
     36     across runs
     37 
     38     :param ftrace: A single trappy.FTrace object
     39         or a path that can be passed to trappy.FTrace
     40     :type ftrace: :mod:`trappy.ftrace.FTrace`
     41 
     42     :param topology: A topology that describes the arrangement of
     43         CPU's on a system. This is useful for multi-cluster systems
     44         where data needs to be aggregated at different topological
     45         levels
     46     :type topology: :mod:`trappy.stats.Topology.Topology`
     47 
     48     :param execname: The execname of the task to be analysed
     49 
     50         .. note::
     51 
     52                 There should be only one PID that maps to the specified
     53                 execname. If there are multiple PIDs :mod:`bart.sched.SchedMultiAssert`
     54                 should be used
     55 
     56     :type execname: str
     57 
     58     :param pid: The process ID of the task to be analysed
     59     :type pid: int
     60 
     61     .. note:
     62 
     63         One of pid or execname is mandatory. If only execname
     64         is specified, The current implementation will fail if
     65         there are more than one processes with the same execname
     66     """
     67 
     68     def __init__(self, ftrace, topology, execname=None, pid=None):
     69 
     70         ftrace = Utils.init_ftrace(ftrace)
     71 
     72         if not execname and not pid:
     73             raise ValueError("Need to specify at least one of pid or execname")
     74 
     75         self.execname = execname
     76         self._ftrace = ftrace
     77         self._pid = self._validate_pid(pid)
     78         self._aggs = {}
     79         self._topology = topology
     80         self._triggers = sched_funcs.sched_triggers(self._ftrace, self._pid,
     81                                               trappy.sched.SchedSwitch)
     82         self.name = "{}-{}".format(self.execname, self._pid)
     83 
     84     def _validate_pid(self, pid):
     85         """Validate the passed pid argument"""
     86 
     87         if not pid:
     88             pids = sched_funcs.get_pids_for_process(self._ftrace,
     89                                               self.execname)
     90 
     91             if len(pids) != 1:
     92                 raise RuntimeError(
     93                     "There should be exactly one PID {0} for {1}".format(
     94                         pids,
     95                         self.execname))
     96 
     97             return pids[0]
     98 
     99         elif self.execname:
    100 
    101             pids = sched_funcs.get_pids_for_process(self._ftrace,
    102                                               self.execname)
    103             if pid not in pids:
    104                 raise RuntimeError(
    105                     "PID {0} not mapped to {1}".format(
    106                         pid,
    107                         self.execname))
    108         else:
    109             self.execname = sched_funcs.get_task_name(self._ftrace, pid)
    110 
    111         return pid
    112 
    113     def _aggregator(self, aggfunc):
    114         """
    115         Return an aggregator corresponding to the
    116         aggfunc, the aggregators are memoized for performance
    117 
    118         :param aggfunc: Function parameter that
    119             accepts a :mod:`pandas.Series` object and
    120             returns a vector/scalar
    121 
    122         :type: function(:mod:`pandas.Series`)
    123         """
    124 
    125         if aggfunc not in self._aggs.keys():
    126             self._aggs[aggfunc] = MultiTriggerAggregator(self._triggers,
    127                                                          self._topology,
    128                                                          aggfunc)
    129         return self._aggs[aggfunc]
    130 
    131     def getResidency(self, level, node, window=None, percent=False):
    132         """
    133         Residency of the task is the amount of time it spends executing
    134         a particular group of a topological level. For example:
    135         ::
    136 
    137             from trappy.stats.Topology import Topology
    138 
    139             big = [1, 2]
    140             little = [0, 3, 4, 5]
    141 
    142             topology = Topology(clusters=[little, big])
    143 
    144             s = SchedAssert(trace, topology, pid=123)
    145             s.getResidency("cluster", big)
    146 
    147         This will return the residency of the task on the big cluster. If
    148         percent is specified it will be normalized to the total runtime
    149         of the task
    150 
    151         :param level: The topological level to which the group belongs
    152         :type level: str
    153 
    154         :param node: The group of CPUs for which residency
    155             needs to calculated
    156         :type node: list
    157 
    158         :param window: A (start, end) tuple to limit the scope of the
    159             residency calculation.
    160         :type window: tuple
    161 
    162         :param percent: If true the result is normalized to the total runtime
    163             of the task and returned as a percentage
    164         :type percent: bool
    165 
    166         .. math::
    167 
    168             R = \\frac{T_{group} \\times 100}{T_{total}}
    169 
    170         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertResidency`
    171         """
    172 
    173         # Get the index of the node in the level
    174         node_index = self._topology.get_index(level, node)
    175 
    176         agg = self._aggregator(sched_funcs.residency_sum)
    177         level_result = agg.aggregate(level=level, window=window)
    178 
    179         node_value = level_result[node_index]
    180 
    181         if percent:
    182             total = agg.aggregate(level="all", window=window)[0]
    183             node_value = node_value * 100
    184             node_value = node_value / total
    185 
    186         return node_value
    187 
    188     def assertResidency(
    189             self,
    190             level,
    191             node,
    192             expected_value,
    193             operator,
    194             window=None,
    195             percent=False):
    196         """
    197         :param level: The topological level to which the group belongs
    198         :type level: str
    199 
    200         :param node: The group of CPUs for which residency
    201             needs to calculated
    202         :type node: list
    203 
    204         :param expected_value: The expected value of the residency
    205         :type expected_value: double
    206 
    207         :param operator: A binary operator function that returns
    208             a boolean. For example:
    209             ::
    210 
    211                 import operator
    212                 op = operator.ge
    213                 assertResidency(level, node, expected_value, op)
    214 
    215             Will do the following check:
    216             ::
    217 
    218                 getResidency(level, node) >= expected_value
    219 
    220             A custom function can also be passed:
    221             ::
    222 
    223                 THRESHOLD=5
    224                 def between_threshold(a, expected):
    225                     return abs(a - expected) <= THRESHOLD
    226 
    227         :type operator: function
    228 
    229         :param window: A (start, end) tuple to limit the scope of the
    230             residency calculation.
    231         :type window: tuple
    232 
    233         :param percent: If true the result is normalized to the total runtime
    234             of the task and returned as a percentage
    235         :type percent: bool
    236 
    237         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getResidency`
    238         """
    239         node_value = self.getResidency(level, node, window, percent)
    240         return operator(node_value, expected_value)
    241 
    242     def getStartTime(self):
    243         """
    244         :return: The first time the task ran across all the CPUs
    245         """
    246 
    247         agg = self._aggregator(sched_funcs.first_time)
    248         result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING)
    249         return min(result[0])
    250 
    251     def getEndTime(self):
    252         """
    253         :return: The first last time the task ran across
    254             all the CPUs
    255         """
    256 
    257         agg = self._aggregator(sched_funcs.first_time)
    258         agg = self._aggregator(sched_funcs.last_time)
    259         result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING)
    260         return max(result[0])
    261 
    262     def _relax_switch_window(self, series, direction, window):
    263         """
    264             direction == "left"
    265                 return the last time the task was running
    266                 if no such time exists in the window,
    267                 extend the window's left extent to
    268                 getStartTime
    269 
    270             direction == "right"
    271                 return the first time the task was running
    272                 in the window. If no such time exists in the
    273                 window, extend the window's right extent to
    274                 getEndTime()
    275 
    276             The function returns a None if
    277             len(series[series == TASK_RUNNING]) == 0
    278             even in the extended window
    279         """
    280 
    281         series = series[series == sched_funcs.TASK_RUNNING]
    282         w_series = sched_funcs.select_window(series, window)
    283         start, stop = window
    284 
    285         if direction == "left":
    286             if len(w_series):
    287                 return w_series.index.values[-1]
    288             else:
    289                 start_time = self.getStartTime()
    290                 w_series = sched_funcs.select_window(
    291                     series,
    292                     window=(
    293                         start_time,
    294                         start))
    295 
    296                 if not len(w_series):
    297                     return None
    298                 else:
    299                     return w_series.index.values[-1]
    300 
    301         elif direction == "right":
    302             if len(w_series):
    303                 return w_series.index.values[0]
    304             else:
    305                 end_time = self.getEndTime()
    306                 w_series = sched_funcs.select_window(series, window=(stop, end_time))
    307 
    308                 if not len(w_series):
    309                     return None
    310                 else:
    311                     return w_series.index.values[0]
    312         else:
    313             raise ValueError("direction should be either left or right")
    314 
    315     def assertSwitch(
    316             self,
    317             level,
    318             from_node,
    319             to_node,
    320             window,
    321             ignore_multiple=True):
    322         """
    323         This function asserts that there is context switch from the
    324            :code:`from_node` to the :code:`to_node`:
    325 
    326         :param level: The topological level to which the group belongs
    327         :type level: str
    328 
    329         :param from_node: The node from which the task switches out
    330         :type from_node: list
    331 
    332         :param to_node: The node to which the task switches
    333         :type to_node: list
    334 
    335         :param window: A (start, end) tuple to limit the scope of the
    336             residency calculation.
    337         :type window: tuple
    338 
    339         :param ignore_multiple: If true, the function will ignore multiple
    340            switches in the window, If false the assert will be true if and
    341            only if there is a single switch within the specified window
    342         :type ignore_multiple: bool
    343         """
    344 
    345         from_node_index = self._topology.get_index(level, from_node)
    346         to_node_index = self._topology.get_index(level, to_node)
    347 
    348         agg = self._aggregator(sched_funcs.csum)
    349         level_result = agg.aggregate(level=level)
    350 
    351         from_node_result = level_result[from_node_index]
    352         to_node_result = level_result[to_node_index]
    353 
    354         from_time = self._relax_switch_window(from_node_result, "left", window)
    355         if ignore_multiple:
    356             to_time = self._relax_switch_window(to_node_result, "left", window)
    357         else:
    358             to_time = self._relax_switch_window(
    359                 to_node_result,
    360                 "right", window)
    361 
    362         if from_time and to_time:
    363             if from_time < to_time:
    364                 return True
    365 
    366         return False
    367 
    368     def getRuntime(self, window=None, percent=False):
    369         """Return the Total Runtime of a task
    370 
    371         :param window: A (start, end) tuple to limit the scope of the
    372             residency calculation.
    373         :type window: tuple
    374 
    375         :param percent: If True, the result is returned
    376             as a percentage of the total execution time
    377             of the run.
    378         :type percent: bool
    379 
    380         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertRuntime`
    381         """
    382 
    383         agg = self._aggregator(sched_funcs.residency_sum)
    384         run_time = agg.aggregate(level="all", window=window)[0]
    385 
    386         if percent:
    387 
    388             if window:
    389                 begin, end = window
    390                 total_time = end - begin
    391             else:
    392                 total_time = self._ftrace.get_duration()
    393 
    394             run_time = run_time * 100
    395             run_time = run_time / total_time
    396 
    397         return run_time
    398 
    399     def assertRuntime(
    400             self,
    401             expected_value,
    402             operator,
    403             window=None,
    404             percent=False):
    405         """Assert on the total runtime of the task
    406 
    407         :param expected_value: The expected value of the runtime
    408         :type expected_value: double
    409 
    410         :param operator: A binary operator function that returns
    411             a boolean. For example:
    412             ::
    413 
    414                 import operator
    415                 op = operator.ge
    416                 assertRuntime(expected_value, op)
    417 
    418             Will do the following check:
    419             ::
    420 
    421                 getRuntime() >= expected_value
    422 
    423             A custom function can also be passed:
    424             ::
    425 
    426                 THRESHOLD=5
    427                 def between_threshold(a, expected):
    428                     return abs(a - expected) <= THRESHOLD
    429 
    430         :type operator: function
    431 
    432         :param window: A (start, end) tuple to limit the scope of the
    433             residency calculation.
    434         :type window: tuple
    435 
    436         :param percent: If True, the result is returned
    437             as a percentage of the total execution time
    438             of the run.
    439         :type percent: bool
    440 
    441         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getRuntime`
    442         """
    443 
    444         run_time = self.getRuntime(window, percent)
    445         return operator(run_time, expected_value)
    446 
    447     def getPeriod(self, window=None, align="start"):
    448         """Return the period of the task in (ms)
    449 
    450         Let's say a task started execution at the following times:
    451 
    452             .. math::
    453 
    454                 T_1, T_2, ...T_n
    455 
    456         The period is defined as:
    457 
    458             .. math::
    459 
    460                 Median((T_2 - T_1), (T_4 - T_3), ....(T_n - T_{n-1}))
    461 
    462         :param window: A (start, end) tuple to limit the scope of the
    463             residency calculation.
    464         :type window: tuple
    465 
    466         :param align:
    467             :code:`"start"` aligns period calculation to switch-in events
    468             :code:`"end"` aligns the calculation to switch-out events
    469         :type param: str
    470 
    471         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertPeriod`
    472         """
    473 
    474         agg = self._aggregator(sched_funcs.period)
    475         deltas = agg.aggregate(level="all", window=window)[0]
    476 
    477         if not len(deltas):
    478             return float("NaN")
    479         else:
    480             return np.median(deltas) * 1000
    481 
    482     def assertPeriod(
    483             self,
    484             expected_value,
    485             operator,
    486             window=None,
    487             align="start"):
    488         """Assert on the period of the task
    489 
    490         :param expected_value: The expected value of the runtime
    491         :type expected_value: double
    492 
    493         :param operator: A binary operator function that returns
    494             a boolean. For example:
    495             ::
    496 
    497                 import operator
    498                 op = operator.ge
    499                 assertPeriod(expected_value, op)
    500 
    501             Will do the following check:
    502             ::
    503 
    504                 getPeriod() >= expected_value
    505 
    506             A custom function can also be passed:
    507             ::
    508 
    509                 THRESHOLD=5
    510                 def between_threshold(a, expected):
    511                     return abs(a - expected) <= THRESHOLD
    512 
    513         :param window: A (start, end) tuple to limit the scope of the
    514             calculation.
    515         :type window: tuple
    516 
    517         :param align:
    518             :code:`"start"` aligns period calculation to switch-in events
    519             :code:`"end"` aligns the calculation to switch-out events
    520         :type param: str
    521 
    522         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getPeriod`
    523         """
    524 
    525         period = self.getPeriod(window, align)
    526         return operator(period, expected_value)
    527 
    528     def getDutyCycle(self, window):
    529         """Return the duty cycle of the task
    530 
    531         :param window: A (start, end) tuple to limit the scope of the
    532             calculation.
    533         :type window: tuple
    534 
    535         Duty Cycle:
    536             The percentage of time the task spends executing
    537             in the given window of time
    538 
    539             .. math::
    540 
    541                 \delta_{cycle} = \\frac{T_{exec} \\times 100}{T_{window}}
    542 
    543         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertDutyCycle`
    544         """
    545 
    546         return self.getRuntime(window, percent=True)
    547 
    548     def assertDutyCycle(self, expected_value, operator, window):
    549         """
    550         :param operator: A binary operator function that returns
    551             a boolean. For example:
    552             ::
    553 
    554                 import operator
    555                 op = operator.ge
    556                 assertPeriod(expected_value, op)
    557 
    558             Will do the following check:
    559             ::
    560 
    561                 getPeriod() >= expected_value
    562 
    563             A custom function can also be passed:
    564             ::
    565 
    566                 THRESHOLD=5
    567                 def between_threshold(a, expected):
    568                     return abs(a - expected) <= THRESHOLD
    569 
    570         :param window: A (start, end) tuple to limit the scope of the
    571             calculation.
    572         :type window: tuple
    573 
    574         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getDutyCycle`
    575 
    576         """
    577         return self.assertRuntime(
    578             expected_value,
    579             operator,
    580             window,
    581             percent=True)
    582 
    583     def getFirstCpu(self, window=None):
    584         """
    585         :return: The first CPU the task ran on
    586 
    587         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertFirstCPU`
    588         """
    589 
    590         agg = self._aggregator(sched_funcs.first_cpu)
    591         result = agg.aggregate(level="cpu", window=window)
    592         result = list(itertools.chain.from_iterable(result))
    593 
    594         min_time = min(result)
    595         if math.isinf(min_time):
    596             return -1
    597         index = result.index(min_time)
    598         return self._topology.get_node("cpu", index)[0]
    599 
    600     def assertFirstCpu(self, cpus, window=None):
    601         """
    602         Check if the Task started (first ran on in the duration
    603         of the trace) on a particular CPU(s)
    604 
    605         :param cpus: A list of acceptable CPUs
    606         :type cpus: int, list
    607 
    608         .. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.getFirstCPU`
    609         """
    610 
    611         first_cpu = self.getFirstCpu(window=window)
    612         cpus = Utils.listify(cpus)
    613         return first_cpu in cpus
    614 
    615     def getLastCpu(self, window=None):
    616         """Return the last CPU the task ran on"""
    617 
    618         agg = self._aggregator(sched_funcs.last_cpu)
    619         result = agg.aggregate(level="cpu", window=window)
    620         result = list(itertools.chain.from_iterable(result))
    621 
    622         end_time = max(result)
    623         if not end_time:
    624             return -1
    625 
    626         return result.index(end_time)
    627 
    628     def generate_events(self, level, start_id=0, window=None):
    629         """Generate events for the trace plot
    630 
    631         .. note::
    632             This is an internal function accessed by the
    633             :mod:`bart.sched.SchedMultiAssert` class for plotting data
    634         """
    635 
    636         agg = self._aggregator(sched_funcs.trace_event)
    637         result = agg.aggregate(level=level, window=window)
    638         events = []
    639 
    640         for idx, level_events in enumerate(result):
    641             if not len(level_events):
    642                 continue
    643             events += np.column_stack((level_events, np.full(len(level_events), idx))).tolist()
    644 
    645         return sorted(events, key = lambda x : x[0])
    646 
    647     def plot(self, level="cpu", window=None, xlim=None):
    648         """
    649         :return: :mod:`trappy.plotter.AbstractDataPlotter` instance
    650             Call :func:`view` to draw the graph
    651         """
    652 
    653         if not xlim:
    654             if not window:
    655                 xlim = [0, self._ftrace.get_duration()]
    656             else:
    657                 xlim = list(window)
    658 
    659         events = {}
    660         events[self.name] = self.generate_events(level, window)
    661         names = [self.name]
    662         num_lanes = self._topology.level_span(level)
    663         lane_prefix = level.upper() + ": "
    664         return trappy.EventPlot(events, names, xlim,
    665                                 lane_prefix=lane_prefix,
    666                                 num_lanes=num_lanes)
    667