Home | History | Annotate | Download | only in stats
      1 /*
      2  * Copyright 2017, OpenCensus Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.opencensus.implcore.stats;
     18 
     19 import static com.google.common.base.Preconditions.checkArgument;
     20 import static com.google.common.base.Preconditions.checkNotNull;
     21 
     22 import com.google.common.annotations.VisibleForTesting;
     23 import io.opencensus.common.Timestamp;
     24 import io.opencensus.metrics.export.Distribution;
     25 import io.opencensus.metrics.export.Distribution.BucketOptions;
     26 import io.opencensus.metrics.export.Point;
     27 import io.opencensus.metrics.export.Value;
     28 import io.opencensus.stats.Aggregation;
     29 import io.opencensus.stats.AggregationData;
     30 import io.opencensus.stats.AggregationData.DistributionData;
     31 import io.opencensus.stats.AggregationData.DistributionData.Exemplar;
     32 import io.opencensus.stats.BucketBoundaries;
     33 import java.util.ArrayList;
     34 import java.util.List;
     35 import java.util.Map;
     36 
     37 /** Mutable version of {@link Aggregation} that supports adding values. */
     38 abstract class MutableAggregation {
     39 
     40   private MutableAggregation() {}
     41 
     42   // Tolerance for double comparison.
     43   private static final double TOLERANCE = 1e-6;
     44 
     45   /**
     46    * Put a new value into the MutableAggregation.
     47    *
     48    * @param value new value to be added to population
     49    * @param attachments the contextual information on an {@link Exemplar}
     50    * @param timestamp the timestamp when the value is recorded
     51    */
     52   abstract void add(double value, Map<String, String> attachments, Timestamp timestamp);
     53 
     54   // TODO(songya): remove this method once interval stats is completely removed.
     55   /**
     56    * Combine the internal values of this MutableAggregation and value of the given
     57    * MutableAggregation, with the given fraction. Then set the internal value of this
     58    * MutableAggregation to the combined value.
     59    *
     60    * @param other the other {@code MutableAggregation}. The type of this and other {@code
     61    *     MutableAggregation} must match.
     62    * @param fraction the fraction that the value in other {@code MutableAggregation} should
     63    *     contribute. Must be within [0.0, 1.0].
     64    */
     65   abstract void combine(MutableAggregation other, double fraction);
     66 
     67   abstract AggregationData toAggregationData();
     68 
     69   abstract Point toPoint(Timestamp timestamp);
     70 
     71   /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */
     72   static class MutableSumDouble extends MutableAggregation {
     73 
     74     private double sum = 0.0;
     75 
     76     private MutableSumDouble() {}
     77 
     78     /**
     79      * Construct a {@code MutableSumDouble}.
     80      *
     81      * @return an empty {@code MutableSumDouble}.
     82      */
     83     static MutableSumDouble create() {
     84       return new MutableSumDouble();
     85     }
     86 
     87     @Override
     88     void add(double value, Map<String, String> attachments, Timestamp timestamp) {
     89       sum += value;
     90     }
     91 
     92     @Override
     93     void combine(MutableAggregation other, double fraction) {
     94       checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected.");
     95       this.sum += fraction * ((MutableSumDouble) other).sum;
     96     }
     97 
     98     @Override
     99     AggregationData toAggregationData() {
    100       return AggregationData.SumDataDouble.create(sum);
    101     }
    102 
    103     @Override
    104     Point toPoint(Timestamp timestamp) {
    105       return Point.create(Value.doubleValue(sum), timestamp);
    106     }
    107 
    108     @VisibleForTesting
    109     double getSum() {
    110       return sum;
    111     }
    112   }
    113 
    114   /** Calculate sum of longs on aggregated {@code MeasureValue}s. */
    115   static final class MutableSumLong extends MutableSumDouble {
    116     private MutableSumLong() {
    117       super();
    118     }
    119 
    120     /**
    121      * Construct a {@code MutableSumLong}.
    122      *
    123      * @return an empty {@code MutableSumLong}.
    124      */
    125     static MutableSumLong create() {
    126       return new MutableSumLong();
    127     }
    128 
    129     @Override
    130     AggregationData toAggregationData() {
    131       return AggregationData.SumDataLong.create(Math.round(getSum()));
    132     }
    133 
    134     @Override
    135     Point toPoint(Timestamp timestamp) {
    136       return Point.create(Value.longValue(Math.round(getSum())), timestamp);
    137     }
    138   }
    139 
    140   /** Calculate count on aggregated {@code MeasureValue}s. */
    141   static final class MutableCount extends MutableAggregation {
    142 
    143     private long count = 0;
    144 
    145     private MutableCount() {}
    146 
    147     /**
    148      * Construct a {@code MutableCount}.
    149      *
    150      * @return an empty {@code MutableCount}.
    151      */
    152     static MutableCount create() {
    153       return new MutableCount();
    154     }
    155 
    156     @Override
    157     void add(double value, Map<String, String> attachments, Timestamp timestamp) {
    158       count++;
    159     }
    160 
    161     @Override
    162     void combine(MutableAggregation other, double fraction) {
    163       checkArgument(other instanceof MutableCount, "MutableCount expected.");
    164       this.count += Math.round(fraction * ((MutableCount) other).getCount());
    165     }
    166 
    167     @Override
    168     AggregationData toAggregationData() {
    169       return AggregationData.CountData.create(count);
    170     }
    171 
    172     @Override
    173     Point toPoint(Timestamp timestamp) {
    174       return Point.create(Value.longValue(count), timestamp);
    175     }
    176 
    177     /**
    178      * Returns the aggregated count.
    179      *
    180      * @return the aggregated count.
    181      */
    182     long getCount() {
    183       return count;
    184     }
    185   }
    186 
    187   /** Calculate mean on aggregated {@code MeasureValue}s. */
    188   static final class MutableMean extends MutableAggregation {
    189 
    190     private double sum = 0.0;
    191     private long count = 0;
    192 
    193     private MutableMean() {}
    194 
    195     /**
    196      * Construct a {@code MutableMean}.
    197      *
    198      * @return an empty {@code MutableMean}.
    199      */
    200     static MutableMean create() {
    201       return new MutableMean();
    202     }
    203 
    204     @Override
    205     void add(double value, Map<String, String> attachments, Timestamp timestamp) {
    206       count++;
    207       sum += value;
    208     }
    209 
    210     @Override
    211     void combine(MutableAggregation other, double fraction) {
    212       checkArgument(other instanceof MutableMean, "MutableMean expected.");
    213       MutableMean mutableMean = (MutableMean) other;
    214       this.count += Math.round(mutableMean.count * fraction);
    215       this.sum += mutableMean.sum * fraction;
    216     }
    217 
    218     @SuppressWarnings("deprecation")
    219     @Override
    220     AggregationData toAggregationData() {
    221       return AggregationData.MeanData.create(getMean(), count);
    222     }
    223 
    224     @Override
    225     Point toPoint(Timestamp timestamp) {
    226       return Point.create(Value.doubleValue(getMean()), timestamp);
    227     }
    228 
    229     /**
    230      * Returns the aggregated mean.
    231      *
    232      * @return the aggregated mean.
    233      */
    234     double getMean() {
    235       return count == 0 ? 0 : sum / count;
    236     }
    237 
    238     /**
    239      * Returns the aggregated count.
    240      *
    241      * @return the aggregated count.
    242      */
    243     long getCount() {
    244       return count;
    245     }
    246 
    247     @VisibleForTesting
    248     double getSum() {
    249       return sum;
    250     }
    251   }
    252 
    253   /** Calculate distribution stats on aggregated {@code MeasureValue}s. */
    254   static final class MutableDistribution extends MutableAggregation {
    255 
    256     private double sum = 0.0;
    257     private double mean = 0.0;
    258     private long count = 0;
    259     private double sumOfSquaredDeviations = 0.0;
    260 
    261     // Initial "impossible" values, that will get reset as soon as first value is added.
    262     private double min = Double.POSITIVE_INFINITY;
    263     private double max = Double.NEGATIVE_INFINITY;
    264 
    265     private final BucketBoundaries bucketBoundaries;
    266     private final long[] bucketCounts;
    267 
    268     // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution,
    269     // exemplars will have the same size to bucketCounts; otherwise exemplars are null.
    270     // Only the newest exemplar will be kept at each index.
    271     @javax.annotation.Nullable private final Exemplar[] exemplars;
    272 
    273     private MutableDistribution(BucketBoundaries bucketBoundaries) {
    274       this.bucketBoundaries = bucketBoundaries;
    275       int buckets = bucketBoundaries.getBoundaries().size() + 1;
    276       this.bucketCounts = new long[buckets];
    277       // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar
    278       // array is guaranteed to be in ascending order.
    279       // If there's no histogram, don't record exemplars.
    280       this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets];
    281     }
    282 
    283     /**
    284      * Construct a {@code MutableDistribution}.
    285      *
    286      * @return an empty {@code MutableDistribution}.
    287      */
    288     static MutableDistribution create(BucketBoundaries bucketBoundaries) {
    289       checkNotNull(bucketBoundaries, "bucketBoundaries should not be null.");
    290       return new MutableDistribution(bucketBoundaries);
    291     }
    292 
    293     @Override
    294     void add(double value, Map<String, String> attachments, Timestamp timestamp) {
    295       sum += value;
    296       count++;
    297 
    298       /*
    299        * Update the sum of squared deviations from the mean with the given value. For values
    300        * x_i this is Sum[i=1..n]((x_i - mean)^2)
    301        *
    302        * Computed using Welfords method (see
    303        * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of
    304        * Computer Programming", Vol. 2, page 323, 3rd edition)
    305        */
    306       double deltaFromMean = value - mean;
    307       mean += deltaFromMean / count;
    308       double deltaFromMean2 = value - mean;
    309       sumOfSquaredDeviations += deltaFromMean * deltaFromMean2;
    310 
    311       if (value < min) {
    312         min = value;
    313       }
    314       if (value > max) {
    315         max = value;
    316       }
    317 
    318       int bucket = 0;
    319       for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) {
    320         if (value < bucketBoundaries.getBoundaries().get(bucket)) {
    321           break;
    322         }
    323       }
    324       bucketCounts[bucket]++;
    325 
    326       // No implicit recording for exemplars - if there are no attachments (contextual information),
    327       // don't record exemplars.
    328       if (!attachments.isEmpty() && exemplars != null) {
    329         exemplars[bucket] = Exemplar.create(value, timestamp, attachments);
    330       }
    331     }
    332 
    333     // We don't compute fractional MutableDistribution, it's either whole or none.
    334     @Override
    335     void combine(MutableAggregation other, double fraction) {
    336       checkArgument(other instanceof MutableDistribution, "MutableDistribution expected.");
    337       if (Math.abs(1.0 - fraction) > TOLERANCE) {
    338         return;
    339       }
    340 
    341       MutableDistribution mutableDistribution = (MutableDistribution) other;
    342       checkArgument(
    343           this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries),
    344           "Bucket boundaries should match.");
    345 
    346       // Algorithm for calculating the combination of sum of squared deviations:
    347       // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm.
    348       if (this.count + mutableDistribution.count > 0) {
    349         double delta = mutableDistribution.mean - this.mean;
    350         this.sumOfSquaredDeviations =
    351             this.sumOfSquaredDeviations
    352                 + mutableDistribution.sumOfSquaredDeviations
    353                 + Math.pow(delta, 2)
    354                     * this.count
    355                     * mutableDistribution.count
    356                     / (this.count + mutableDistribution.count);
    357       }
    358 
    359       this.count += mutableDistribution.count;
    360       this.sum += mutableDistribution.sum;
    361       this.mean = this.sum / this.count;
    362 
    363       if (mutableDistribution.min < this.min) {
    364         this.min = mutableDistribution.min;
    365       }
    366       if (mutableDistribution.max > this.max) {
    367         this.max = mutableDistribution.max;
    368       }
    369 
    370       long[] bucketCounts = mutableDistribution.getBucketCounts();
    371       for (int i = 0; i < bucketCounts.length; i++) {
    372         this.bucketCounts[i] += bucketCounts[i];
    373       }
    374 
    375       Exemplar[] otherExemplars = mutableDistribution.getExemplars();
    376       if (exemplars != null && otherExemplars != null) {
    377         for (int i = 0; i < otherExemplars.length; i++) {
    378           Exemplar exemplar = otherExemplars[i];
    379           // Assume other is always newer than this, because we combined interval buckets in time
    380           // order.
    381           // If there's a newer exemplar, overwrite current value.
    382           if (exemplar != null) {
    383             this.exemplars[i] = exemplar;
    384           }
    385         }
    386       }
    387     }
    388 
    389     @Override
    390     AggregationData toAggregationData() {
    391       List<Long> boxedBucketCounts = new ArrayList<Long>();
    392       for (long bucketCount : bucketCounts) {
    393         boxedBucketCounts.add(bucketCount);
    394       }
    395       List<Exemplar> exemplarList = new ArrayList<Exemplar>();
    396       if (exemplars != null) {
    397         for (Exemplar exemplar : exemplars) {
    398           if (exemplar != null) {
    399             exemplarList.add(exemplar);
    400           }
    401         }
    402       }
    403       return DistributionData.create(
    404           mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList);
    405     }
    406 
    407     @Override
    408     Point toPoint(Timestamp timestamp) {
    409       List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>();
    410       for (int bucket = 0; bucket < bucketCounts.length; bucket++) {
    411         long bucketCount = bucketCounts[bucket];
    412         @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null;
    413         if (exemplars != null) {
    414           exemplar = exemplars[bucket];
    415         }
    416 
    417         Distribution.Bucket metricBucket;
    418         if (exemplar != null) {
    419           // Bucket with an Exemplar.
    420           metricBucket =
    421               Distribution.Bucket.create(
    422                   bucketCount,
    423                   Distribution.Exemplar.create(
    424                       exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments()));
    425         } else {
    426           // Bucket with no Exemplar.
    427           metricBucket = Distribution.Bucket.create(bucketCount);
    428         }
    429         buckets.add(metricBucket);
    430       }
    431 
    432       // TODO(mayurkale): Drop the first bucket when converting to metrics.
    433       // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)).
    434       BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries());
    435 
    436       return Point.create(
    437           Value.distributionValue(
    438               Distribution.create(
    439                   count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)),
    440           timestamp);
    441     }
    442 
    443     double getMean() {
    444       return mean;
    445     }
    446 
    447     long getCount() {
    448       return count;
    449     }
    450 
    451     double getMin() {
    452       return min;
    453     }
    454 
    455     double getMax() {
    456       return max;
    457     }
    458 
    459     // Returns the aggregated sum of squared deviations.
    460     double getSumOfSquaredDeviations() {
    461       return sumOfSquaredDeviations;
    462     }
    463 
    464     long[] getBucketCounts() {
    465       return bucketCounts;
    466     }
    467 
    468     BucketBoundaries getBucketBoundaries() {
    469       return bucketBoundaries;
    470     }
    471 
    472     @javax.annotation.Nullable
    473     Exemplar[] getExemplars() {
    474       return exemplars;
    475     }
    476   }
    477 
    478   /** Calculate double last value on aggregated {@code MeasureValue}s. */
    479   static class MutableLastValueDouble extends MutableAggregation {
    480 
    481     // Initial value that will get reset as soon as first value is added.
    482     private double lastValue = Double.NaN;
    483     // TODO(songya): remove this once interval stats is completely removed.
    484     private boolean initialized = false;
    485 
    486     private MutableLastValueDouble() {}
    487 
    488     /**
    489      * Construct a {@code MutableLastValueDouble}.
    490      *
    491      * @return an empty {@code MutableLastValueDouble}.
    492      */
    493     static MutableLastValueDouble create() {
    494       return new MutableLastValueDouble();
    495     }
    496 
    497     @Override
    498     void add(double value, Map<String, String> attachments, Timestamp timestamp) {
    499       lastValue = value;
    500       // TODO(songya): remove this once interval stats is completely removed.
    501       if (!initialized) {
    502         initialized = true;
    503       }
    504     }
    505 
    506     @Override
    507     void combine(MutableAggregation other, double fraction) {
    508       checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected.");
    509       MutableLastValueDouble otherValue = (MutableLastValueDouble) other;
    510       // Assume other is always newer than this, because we combined interval buckets in time order.
    511       // If there's a newer value, overwrite current value.
    512       this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue;
    513     }
    514 
    515     @Override
    516     AggregationData toAggregationData() {
    517       return AggregationData.LastValueDataDouble.create(lastValue);
    518     }
    519 
    520     @Override
    521     Point toPoint(Timestamp timestamp) {
    522       return Point.create(Value.doubleValue(lastValue), timestamp);
    523     }
    524 
    525     @VisibleForTesting
    526     double getLastValue() {
    527       return lastValue;
    528     }
    529   }
    530 
    531   /** Calculate last long value on aggregated {@code MeasureValue}s. */
    532   static final class MutableLastValueLong extends MutableLastValueDouble {
    533     private MutableLastValueLong() {
    534       super();
    535     }
    536 
    537     /**
    538      * Construct a {@code MutableLastValueLong}.
    539      *
    540      * @return an empty {@code MutableLastValueLong}.
    541      */
    542     static MutableLastValueLong create() {
    543       return new MutableLastValueLong();
    544     }
    545 
    546     @Override
    547     AggregationData toAggregationData() {
    548       return AggregationData.LastValueDataLong.create(Math.round(getLastValue()));
    549     }
    550 
    551     @Override
    552     Point toPoint(Timestamp timestamp) {
    553       return Point.create(Value.longValue(Math.round(getLastValue())), timestamp);
    554     }
    555   }
    556 }
    557