      1 /*
      2  * Copyright 2017, OpenCensus Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     17 package io.opencensus.implcore.stats;
     19 import static com.google.common.base.Preconditions.checkArgument;
     20 import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap;
     21 import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation;
     22 import static io.opencensus.implcore.stats.RecordUtils.getTagMap;
     23 import static io.opencensus.implcore.stats.RecordUtils.getTagValues;
     25 import com.google.common.annotations.VisibleForTesting;
     26 import com.google.common.collect.LinkedHashMultimap;
     27 import com.google.common.collect.Maps;
     28 import com.google.common.collect.Multimap;
     29 import io.opencensus.common.Duration;
     30 import io.opencensus.common.Function;
     31 import io.opencensus.common.Functions;
     32 import io.opencensus.common.Timestamp;
     33 import io.opencensus.implcore.internal.CheckerFrameworkUtils;
     34 import io.opencensus.implcore.internal.CurrentState.State;
     35 import io.opencensus.metrics.LabelValue;
     36 import io.opencensus.metrics.export.Metric;
     37 import io.opencensus.metrics.export.MetricDescriptor;
     38 import io.opencensus.metrics.export.MetricDescriptor.Type;
     39 import io.opencensus.metrics.export.Point;
     40 import io.opencensus.metrics.export.TimeSeries;
     41 import io.opencensus.stats.Aggregation;
     42 import io.opencensus.stats.AggregationData;
     43 import io.opencensus.stats.Measure;
     44 import io.opencensus.stats.View;
     45 import io.opencensus.stats.ViewData;
     46 import io.opencensus.tags.TagContext;
     47 import io.opencensus.tags.TagValue;
     48 import java.util.ArrayDeque;
     49 import java.util.ArrayList;
     50 import java.util.Collections;
     51 import java.util.List;
     52 import java.util.Map;
     53 import java.util.Map.Entry;
     55 /*>>>
     56 import org.checkerframework.checker.nullness.qual.Nullable;
     57 */
     59 /** A mutable version of {@link ViewData}, used for recording stats and start/end time. */
     60 @SuppressWarnings("deprecation")
     61 abstract class MutableViewData {
     63   @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0);
     65   private final View view;
     67   private MutableViewData(View view) {
     68     this.view = view;
     69   }
     71   /**
     72    * Constructs a new {@link MutableViewData}.
     73    *
     74    * @param view the {@code View} linked with this {@code MutableViewData}.
     75    * @param start the start {@code Timestamp}.
     76    * @return a {@code MutableViewData}.
     77    */
     78   static MutableViewData create(final View view, final Timestamp start) {
     79     return view.getWindow()
     80         .match(
     81             new CreateCumulative(view, start),
     82             new CreateInterval(view, start),
     83             Functions.<MutableViewData>throwAssertionError());
     84   }
     86   /** The {@link View} associated with this {@link ViewData}. */
     87   View getView() {
     88     return view;
     89   }
     91   @javax.annotation.Nullable
     92   abstract Metric toMetric(Timestamp now, State state);
     94   /** Record stats with the given tags. */
     95   abstract void record(
     96       TagContext context, double value, Timestamp timestamp, Map<String, String> attachments);
     98   /** Convert this {@link MutableViewData} to {@link ViewData}. */
     99   abstract ViewData toViewData(Timestamp now, State state);
    101   // Clear recorded stats.
    102   abstract void clearStats();
    104   // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh
    105   // bucket list (for InternalMutableViewData).
    106   abstract void resumeStatsCollection(Timestamp now);
    108   private static final class CumulativeMutableViewData extends MutableViewData {
    110     private Timestamp start;
    111     private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
    112         Maps.newHashMap();
    113     // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future.
    114     private final MetricDescriptor metricDescriptor;
    116     private CumulativeMutableViewData(View view, Timestamp start) {
    117       super(view);
    118       this.start = start;
    119       MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view);
    120       if (metricDescriptor == null) {
    121         throw new AssertionError(
    122             "Cumulative view should be converted to a non-null MetricDescriptor.");
    123       } else {
    124         this.metricDescriptor = metricDescriptor;
    125       }
    126     }
    128     @javax.annotation.Nullable
    129     @Override
    130     Metric toMetric(Timestamp now, State state) {
    131       if (state == State.DISABLED) {
    132         return null;
    133       }
    134       Type type = metricDescriptor.getType();
    135       @javax.annotation.Nullable
    136       Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start;
    137       List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>();
    138       for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
    139           tagValueAggregationMap.entrySet()) {
    140         List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey());
    141         Point point = entry.getValue().toPoint(now);
    142         timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime));
    143       }
    144       return Metric.create(metricDescriptor, timeSeriesList);
    145     }
    147     @Override
    148     void record(
    149         TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
    150       List</*@Nullable*/ TagValue> tagValues =
    151           getTagValues(getTagMap(context), super.view.getColumns());
    152       if (!tagValueAggregationMap.containsKey(tagValues)) {
    153         tagValueAggregationMap.put(
    154             tagValues,
    155             createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure()));
    156       }
    157       tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
    158     }
    160     @Override
    161     ViewData toViewData(Timestamp now, State state) {
    162       if (state == State.ENABLED) {
    163         return ViewData.create(
    164             super.view,
    165             createAggregationMap(tagValueAggregationMap, super.view.getMeasure()),
    166             ViewData.AggregationWindowData.CumulativeData.create(start, now));
    167       } else {
    168         // If Stats state is DISABLED, return an empty ViewData.
    169         return ViewData.create(
    170             super.view,
    171             Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
    172             ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP));
    173       }
    174     }
    176     @Override
    177     void clearStats() {
    178       tagValueAggregationMap.clear();
    179     }
    181     @Override
    182     void resumeStatsCollection(Timestamp now) {
    183       start = now;
    184     }
    185   }
    187   /*
    188    * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4).
    189    * Each bucket has a duration which is interval duration / N.
    190    * Ideally:
    191    * 1. the buckets should always be up-to-date,
    192    * 2. current time should always be within the latest bucket, currently recorded stats should fall
    193    *    into the latest bucket,
    194    * 3. there are always N buckets before the current one, which holds the stats in the past
    195    *    interval duration.
    196    *
    197    * When getView() is called, we will extract and combine the stats from the current and past
    198    * buckets (part of the stats from the oldest bucket could have expired).
    199    *
    200    * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and
    201    * updating the bucket queue will be expensive). When we call record() or getView(), some or all
    202    * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove
    203    * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three
    204    * invariants in the ideal situation.
    205    *
    206    * For example:
    207    * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s.
    208    * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0).
    209    * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some
    210    *    buckets could expire.
    211    * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add
    212    *    two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0)
    213    *    and [4.0, 6.0)
    214    * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add
    215    *    5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets.
    216    * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two
    217    *    expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and
    218    *    return the combined IntervalViewData.
    219    */
    220   private static final class IntervalMutableViewData extends MutableViewData {
    222     // TODO(songya): allow customizable bucket size in the future.
    223     private static final int N = 4; // IntervalView has N + 1 buckets
    225     private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>();
    227     private final Duration totalDuration; // Duration of the whole interval.
    228     private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N)
    230     private IntervalMutableViewData(View view, Timestamp start) {
    231       super(view);
    232       Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration();
    233       this.totalDuration = totalDuration;
    234       this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N);
    236       // When initializing. add N empty buckets prior to the start timestamp of this
    237       // IntervalMutableViewData, so that the last bucket will be the current one in effect.
    238       shiftBucketList(N + 1, start);
    239     }
    241     @javax.annotation.Nullable
    242     @Override
    243     Metric toMetric(Timestamp now, State state) {
    244       return null;
    245     }
    247     @Override
    248     void record(
    249         TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
    250       List</*@Nullable*/ TagValue> tagValues =
    251           getTagValues(getTagMap(context), super.view.getColumns());
    252       refreshBucketList(timestamp);
    253       // It is always the last bucket that does the recording.
    254       CheckerFrameworkUtils.castNonNull(buckets.peekLast())
    255           .record(tagValues, value, attachments, timestamp);
    256     }
    258     @Override
    259     ViewData toViewData(Timestamp now, State state) {
    260       refreshBucketList(now);
    261       if (state == State.ENABLED) {
    262         return ViewData.create(
    263             super.view,
    264             combineBucketsAndGetAggregationMap(now),
    265             ViewData.AggregationWindowData.IntervalData.create(now));
    266       } else {
    267         // If Stats state is DISABLED, return an empty ViewData.
    268         return ViewData.create(
    269             super.view,
    270             Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
    271             ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP));
    272       }
    273     }
    275     @Override
    276     void clearStats() {
    277       for (IntervalBucket bucket : buckets) {
    278         bucket.clearStats();
    279       }
    280     }
    282     @Override
    283     void resumeStatsCollection(Timestamp now) {
    284       // Refresh bucket list to be ready for stats recording, so that if record() is called right
    285       // after stats state is turned back on, record() will be faster.
    286       refreshBucketList(now);
    287     }
    289     // Add new buckets and remove expired buckets by comparing the current timestamp with
    290     // timestamp of the last bucket.
    291     private void refreshBucketList(Timestamp now) {
    292       if (buckets.size() != N + 1) {
    293         throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets.");
    294       }
    295       Timestamp startOfLastBucket =
    296           CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart();
    297       // TODO(songya): decide what to do when time goes backwards
    298       checkArgument(
    299           now.compareTo(startOfLastBucket) >= 0,
    300           "Current time must be within or after the last bucket.");
    301       long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis();
    302       long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis();
    304       shiftBucketList(numOfPadBuckets, now);
    305     }
    307     // Add specified number of new buckets, and remove expired buckets
    308     private void shiftBucketList(long numOfPadBuckets, Timestamp now) {
    309       Timestamp startOfNewBucket;
    311       if (!buckets.isEmpty()) {
    312         startOfNewBucket =
    313             CheckerFrameworkUtils.castNonNull(buckets.peekLast())
    314                 .getStart()
    315                 .addDuration(bucketDuration);
    316       } else {
    317         // Initialize bucket list. Should only enter this block once.
    318         startOfNewBucket = subtractDuration(now, totalDuration);
    319       }
    321       if (numOfPadBuckets > N + 1) {
    322         // All current buckets expired, need to add N + 1 new buckets. The start time of the latest
    323         // bucket will be current time.
    324         startOfNewBucket = subtractDuration(now, totalDuration);
    325         numOfPadBuckets = N + 1;
    326       }
    328       for (int i = 0; i < numOfPadBuckets; i++) {
    329         buckets.add(
    330             new IntervalBucket(
    331                 startOfNewBucket,
    332                 bucketDuration,
    333                 super.view.getAggregation(),
    334                 super.view.getMeasure()));
    335         startOfNewBucket = startOfNewBucket.addDuration(bucketDuration);
    336       }
    338       // removed expired buckets
    339       while (buckets.size() > N + 1) {
    340         buckets.pollFirst();
    341       }
    342     }
    344     // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from
    345     // tag values to aggregation data.
    346     private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap(
    347         Timestamp now) {
    348       // Need to maintain the order of inserted MutableAggregations (inserted based on time order).
    349       Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap =
    350           LinkedHashMultimap.create();
    352       ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets);
    354       Aggregation aggregation = super.view.getAggregation();
    355       Measure measure = super.view.getMeasure();
    356       putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now);
    357       Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap =
    358           aggregateOnEachTagValueList(multimap, aggregation, measure);
    359       return createAggregationMap(singleMap, super.getView().getMeasure());
    360     }
    362     // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple
    363     // mutable aggregations (map value) from different buckets.
    364     private static void putBucketsIntoMultiMap(
    365         ArrayDeque<IntervalBucket> buckets,
    366         Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap,
    367         Aggregation aggregation,
    368         Measure measure,
    369         Timestamp now) {
    370       // Put fractional stats of the head (oldest) bucket.
    371       IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst());
    372       IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast());
    373       double fractionTail = tail.getFraction(now);
    374       // TODO(songya): decide what to do when time goes backwards
    375       checkArgument(
    376           0.0 <= fractionTail && fractionTail <= 1.0,
    377           "Fraction " + fractionTail + " should be within [0.0, 1.0].");
    378       double fractionHead = 1.0 - fractionTail;
    379       putFractionalMutableAggregationsToMultiMap(
    380           head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead);
    382       // Put whole data of other buckets.
    383       boolean shouldSkipFirst = true;
    384       for (IntervalBucket bucket : buckets) {
    385         if (shouldSkipFirst) {
    386           shouldSkipFirst = false;
    387           continue; // skip the first bucket
    388         }
    389         for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
    390             bucket.getTagValueAggregationMap().entrySet()) {
    391           multimap.put(entry.getKey(), entry.getValue());
    392         }
    393       }
    394     }
    396     // Put stats within one bucket into multimap, multiplied by a given fraction.
    397     private static <T> void putFractionalMutableAggregationsToMultiMap(
    398         Map<T, MutableAggregation> mutableAggrMap,
    399         Multimap<T, MutableAggregation> multimap,
    400         Aggregation aggregation,
    401         Measure measure,
    402         double fraction) {
    403       for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) {
    404         // Initially empty MutableAggregations.
    405         MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure);
    406         fractionalMutableAgg.combine(entry.getValue(), fraction);
    407         multimap.put(entry.getKey(), fractionalMutableAgg);
    408       }
    409     }
    411     // For each tag value list (key of AggregationMap), combine mutable aggregations into one
    412     // mutable aggregation, thus convert the multimap into a single map.
    413     private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList(
    414         Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) {
    415       Map<T, MutableAggregation> map = Maps.newHashMap();
    416       for (T tagValues : multimap.keySet()) {
    417         // Initially empty MutableAggregations.
    418         MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure);
    419         for (MutableAggregation mutableAggregation : multimap.get(tagValues)) {
    420           combinedAggregation.combine(mutableAggregation, 1.0);
    421         }
    422         map.put(tagValues, combinedAggregation);
    423       }
    424       return map;
    425     }
    427     // Subtract a Duration from a Timestamp, and return a new Timestamp.
    428     private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) {
    429       return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos()));
    430     }
    431   }
    433   private static final class CreateCumulative
    434       implements Function<View.AggregationWindow.Cumulative, MutableViewData> {
    435     @Override
    436     public MutableViewData apply(View.AggregationWindow.Cumulative arg) {
    437       return new CumulativeMutableViewData(view, start);
    438     }
    440     private final View view;
    441     private final Timestamp start;
    443     private CreateCumulative(View view, Timestamp start) {
    444       this.view = view;
    445       this.start = start;
    446     }
    447   }
    449   private static final class CreateInterval
    450       implements Function<View.AggregationWindow.Interval, MutableViewData> {
    451     @Override
    452     public MutableViewData apply(View.AggregationWindow.Interval arg) {
    453       return new IntervalMutableViewData(view, start);
    454     }
    456     private final View view;
    457     private final Timestamp start;
    459     private CreateInterval(View view, Timestamp start) {
    460       this.view = view;
    461       this.start = start;
    462     }
    463   }
    464 }