Home | History | Annotate | Download | only in stats
      1 #    Copyright 2015-2017 ARM Limited
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 #
     15 
     16 """Aggregators are responsible for aggregating information
     17 for further analysis. These aggregations can produce
     18 both scalars and vectors and each aggregator implementation
     19 is expected to handle its "aggregation" mechanism.
     20 """
     21 
     22 
     23 from trappy.utils import listify
     24 from trappy.stats.Indexer import MultiTriggerIndexer
     25 from abc import ABCMeta, abstractmethod
     26 
     27 
     28 class AbstractAggregator(object):
     29     """Abstract class for all aggregators
     30 
     31     :param indexer: Indexer is passed on by the Child class
     32         for handling indices during correlation
     33     :type indexer: :mod:`trappy.stats.Indexer.Indexer`
     34 
     35     :param aggfunc: Function that accepts a pandas.Series and
     36         process it for aggregation.
     37     :type aggfunc: function
     38     """
     39 
     40     __metaclass__ = ABCMeta
     41 
     42     # The current implementation needs the index to
     43     # be unified across data frames to account for
     44     # variable sampling across data frames
     45     def __init__(self, indexer, aggfunc=None):
     46 
     47         self._result = {}
     48         self._aggregated = False
     49         self._aggfunc = aggfunc
     50         self.indexer = indexer
     51 
     52     def _add_result(self, pivot, series):
     53         """Add the result for the given pivot and trace
     54 
     55         :param pivot: The pivot for which the result is being generated
     56         :type pivot(hashable)
     57 
     58         :param series: series to be added to result
     59         :type series: :mod:`pandas.Series`
     60         """
     61 
     62         if pivot not in self._result:
     63             self._result[pivot] = self.indexer.series()
     64 
     65         for idx in series.index:
     66                 self._result[pivot][idx] = series[idx]
     67 
     68     @abstractmethod
     69     def aggregate(self, trace_idx, **kwargs):
     70         """Abstract Method for aggregating data for various
     71         pivots.
     72 
     73         :param trace_idx: Index of the trace to be aggregated
     74         :type trace_idx: int
     75 
     76         :return: The aggregated result
     77 
     78         """
     79 
     80         raise NotImplementedError("Method Not Implemented")
     81 
     82 
     83 class MultiTriggerAggregator(AbstractAggregator):
     84 
     85     """This aggregator accepts a list of triggers and each trigger has
     86      a value associated with it.
     87     """
     88 
     89     def __init__(self, triggers, topology, aggfunc=None):
     90         """
     91         :param triggers: trappy.stat.Trigger): A list or a singular trigger object
     92         :type triggers: :mod:`trappy.stat.Trigger.Trigger`
     93 
     94         :param topology (trappy.stat.Topology): A topology object for aggregation
     95                 levels
     96         :type topology: :mod:`trappy.stat.Topology`
     97 
     98         :param aggfunc: A function to be applied on each series being aggregated.
     99                 For each topology node, a series will be generated and this
    100                 will be processed by the aggfunc
    101         :type aggfunc: function
    102         """
    103         self._triggers = triggers
    104         self.topology = topology
    105         super(
    106             MultiTriggerAggregator,
    107             self).__init__(MultiTriggerIndexer(triggers), aggfunc)
    108 
    109     def aggregate(self, **kwargs):
    110         """
    111         Aggregate implementation that aggregates
    112         triggers for a given topological level. All the arguments passed to
    113         it are forwarded to the aggregator function except level (if present)
    114 
    115         :return: A scalar or a vector aggregated result. Each group in the
    116             level produces an element in the result list with a one to one
    117             index correspondence
    118             ::
    119 
    120                 groups["level"] = [[1,2], [3,4]]
    121                 result = [result_1, result_2]
    122         """
    123 
    124         level = kwargs.pop("level", "all")
    125 
    126         # This function is a hot spot in the code. It is
    127         # worth considering a memoize decorator to cache
    128         # the function. The memoization can also be
    129         # maintained by the aggregator object. This will
    130         # help the code scale efficeintly
    131         level_groups = self.topology.get_level(level)
    132         result = []
    133 
    134 
    135         if not self._aggregated:
    136             self._aggregate_base()
    137 
    138         for group in level_groups:
    139             group = listify(group)
    140             if self._aggfunc is not None:
    141                 level_res = self._aggfunc(self._result[group[0]], **kwargs)
    142             else:
    143                 level_res = self._result[group[0]]
    144 
    145             for node in group[1:]:
    146                 if self._aggfunc is not None:
    147                     node_res = self._aggfunc(self._result[node], **kwargs)
    148                 else:
    149                     node_res = self._result[node]
    150 
    151                 level_res += node_res
    152 
    153             result.append(level_res)
    154 
    155         return result
    156 
    157     def _aggregate_base(self):
    158         """A memoized function to generate the base series
    159         for each node in the flattened topology
    160         ::
    161 
    162             topo["level_1"] = [[1, 2], [3, 4]]
    163 
    164        This function will generate the fundamental
    165        aggregations for all nodes 1, 2, 3, 4 and
    166        store the result in _agg_result
    167        """
    168 
    169         for trigger in self._triggers:
    170             for node in self.topology.flatten():
    171                 result_series = trigger.generate(node)
    172                 self._add_result(node, result_series)
    173 
    174         self._aggregated = True
    175