1 /* 2 * Copyright 2017, OpenCensus Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.opencensus.implcore.stats; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import io.opencensus.common.Timestamp; 24 import io.opencensus.metrics.export.Distribution; 25 import io.opencensus.metrics.export.Distribution.BucketOptions; 26 import io.opencensus.metrics.export.Point; 27 import io.opencensus.metrics.export.Value; 28 import io.opencensus.stats.Aggregation; 29 import io.opencensus.stats.AggregationData; 30 import io.opencensus.stats.AggregationData.DistributionData; 31 import io.opencensus.stats.AggregationData.DistributionData.Exemplar; 32 import io.opencensus.stats.BucketBoundaries; 33 import java.util.ArrayList; 34 import java.util.List; 35 import java.util.Map; 36 37 /** Mutable version of {@link Aggregation} that supports adding values. */ 38 abstract class MutableAggregation { 39 40 private MutableAggregation() {} 41 42 // Tolerance for double comparison. 43 private static final double TOLERANCE = 1e-6; 44 45 /** 46 * Put a new value into the MutableAggregation. 47 * 48 * @param value new value to be added to population 49 * @param attachments the contextual information on an {@link Exemplar} 50 * @param timestamp the timestamp when the value is recorded 51 */ 52 abstract void add(double value, Map<String, String> attachments, Timestamp timestamp); 53 54 // TODO(songya): remove this method once interval stats is completely removed. 55 /** 56 * Combine the internal values of this MutableAggregation and value of the given 57 * MutableAggregation, with the given fraction. Then set the internal value of this 58 * MutableAggregation to the combined value. 59 * 60 * @param other the other {@code MutableAggregation}. The type of this and other {@code 61 * MutableAggregation} must match. 62 * @param fraction the fraction that the value in other {@code MutableAggregation} should 63 * contribute. Must be within [0.0, 1.0]. 64 */ 65 abstract void combine(MutableAggregation other, double fraction); 66 67 abstract AggregationData toAggregationData(); 68 69 abstract Point toPoint(Timestamp timestamp); 70 71 /** Calculate sum of doubles on aggregated {@code MeasureValue}s. */ 72 static class MutableSumDouble extends MutableAggregation { 73 74 private double sum = 0.0; 75 76 private MutableSumDouble() {} 77 78 /** 79 * Construct a {@code MutableSumDouble}. 80 * 81 * @return an empty {@code MutableSumDouble}. 82 */ 83 static MutableSumDouble create() { 84 return new MutableSumDouble(); 85 } 86 87 @Override 88 void add(double value, Map<String, String> attachments, Timestamp timestamp) { 89 sum += value; 90 } 91 92 @Override 93 void combine(MutableAggregation other, double fraction) { 94 checkArgument(other instanceof MutableSumDouble, "MutableSumDouble expected."); 95 this.sum += fraction * ((MutableSumDouble) other).sum; 96 } 97 98 @Override 99 AggregationData toAggregationData() { 100 return AggregationData.SumDataDouble.create(sum); 101 } 102 103 @Override 104 Point toPoint(Timestamp timestamp) { 105 return Point.create(Value.doubleValue(sum), timestamp); 106 } 107 108 @VisibleForTesting 109 double getSum() { 110 return sum; 111 } 112 } 113 114 /** Calculate sum of longs on aggregated {@code MeasureValue}s. */ 115 static final class MutableSumLong extends MutableSumDouble { 116 private MutableSumLong() { 117 super(); 118 } 119 120 /** 121 * Construct a {@code MutableSumLong}. 122 * 123 * @return an empty {@code MutableSumLong}. 124 */ 125 static MutableSumLong create() { 126 return new MutableSumLong(); 127 } 128 129 @Override 130 AggregationData toAggregationData() { 131 return AggregationData.SumDataLong.create(Math.round(getSum())); 132 } 133 134 @Override 135 Point toPoint(Timestamp timestamp) { 136 return Point.create(Value.longValue(Math.round(getSum())), timestamp); 137 } 138 } 139 140 /** Calculate count on aggregated {@code MeasureValue}s. */ 141 static final class MutableCount extends MutableAggregation { 142 143 private long count = 0; 144 145 private MutableCount() {} 146 147 /** 148 * Construct a {@code MutableCount}. 149 * 150 * @return an empty {@code MutableCount}. 151 */ 152 static MutableCount create() { 153 return new MutableCount(); 154 } 155 156 @Override 157 void add(double value, Map<String, String> attachments, Timestamp timestamp) { 158 count++; 159 } 160 161 @Override 162 void combine(MutableAggregation other, double fraction) { 163 checkArgument(other instanceof MutableCount, "MutableCount expected."); 164 this.count += Math.round(fraction * ((MutableCount) other).getCount()); 165 } 166 167 @Override 168 AggregationData toAggregationData() { 169 return AggregationData.CountData.create(count); 170 } 171 172 @Override 173 Point toPoint(Timestamp timestamp) { 174 return Point.create(Value.longValue(count), timestamp); 175 } 176 177 /** 178 * Returns the aggregated count. 179 * 180 * @return the aggregated count. 181 */ 182 long getCount() { 183 return count; 184 } 185 } 186 187 /** Calculate mean on aggregated {@code MeasureValue}s. */ 188 static final class MutableMean extends MutableAggregation { 189 190 private double sum = 0.0; 191 private long count = 0; 192 193 private MutableMean() {} 194 195 /** 196 * Construct a {@code MutableMean}. 197 * 198 * @return an empty {@code MutableMean}. 199 */ 200 static MutableMean create() { 201 return new MutableMean(); 202 } 203 204 @Override 205 void add(double value, Map<String, String> attachments, Timestamp timestamp) { 206 count++; 207 sum += value; 208 } 209 210 @Override 211 void combine(MutableAggregation other, double fraction) { 212 checkArgument(other instanceof MutableMean, "MutableMean expected."); 213 MutableMean mutableMean = (MutableMean) other; 214 this.count += Math.round(mutableMean.count * fraction); 215 this.sum += mutableMean.sum * fraction; 216 } 217 218 @SuppressWarnings("deprecation") 219 @Override 220 AggregationData toAggregationData() { 221 return AggregationData.MeanData.create(getMean(), count); 222 } 223 224 @Override 225 Point toPoint(Timestamp timestamp) { 226 return Point.create(Value.doubleValue(getMean()), timestamp); 227 } 228 229 /** 230 * Returns the aggregated mean. 231 * 232 * @return the aggregated mean. 233 */ 234 double getMean() { 235 return count == 0 ? 0 : sum / count; 236 } 237 238 /** 239 * Returns the aggregated count. 240 * 241 * @return the aggregated count. 242 */ 243 long getCount() { 244 return count; 245 } 246 247 @VisibleForTesting 248 double getSum() { 249 return sum; 250 } 251 } 252 253 /** Calculate distribution stats on aggregated {@code MeasureValue}s. */ 254 static final class MutableDistribution extends MutableAggregation { 255 256 private double sum = 0.0; 257 private double mean = 0.0; 258 private long count = 0; 259 private double sumOfSquaredDeviations = 0.0; 260 261 // Initial "impossible" values, that will get reset as soon as first value is added. 262 private double min = Double.POSITIVE_INFINITY; 263 private double max = Double.NEGATIVE_INFINITY; 264 265 private final BucketBoundaries bucketBoundaries; 266 private final long[] bucketCounts; 267 268 // If there's a histogram (i.e bucket boundaries are not empty) in this MutableDistribution, 269 // exemplars will have the same size to bucketCounts; otherwise exemplars are null. 270 // Only the newest exemplar will be kept at each index. 271 @javax.annotation.Nullable private final Exemplar[] exemplars; 272 273 private MutableDistribution(BucketBoundaries bucketBoundaries) { 274 this.bucketBoundaries = bucketBoundaries; 275 int buckets = bucketBoundaries.getBoundaries().size() + 1; 276 this.bucketCounts = new long[buckets]; 277 // In the implementation, each histogram bucket can have up to one exemplar, and the exemplar 278 // array is guaranteed to be in ascending order. 279 // If there's no histogram, don't record exemplars. 280 this.exemplars = bucketBoundaries.getBoundaries().isEmpty() ? null : new Exemplar[buckets]; 281 } 282 283 /** 284 * Construct a {@code MutableDistribution}. 285 * 286 * @return an empty {@code MutableDistribution}. 287 */ 288 static MutableDistribution create(BucketBoundaries bucketBoundaries) { 289 checkNotNull(bucketBoundaries, "bucketBoundaries should not be null."); 290 return new MutableDistribution(bucketBoundaries); 291 } 292 293 @Override 294 void add(double value, Map<String, String> attachments, Timestamp timestamp) { 295 sum += value; 296 count++; 297 298 /* 299 * Update the sum of squared deviations from the mean with the given value. For values 300 * x_i this is Sum[i=1..n]((x_i - mean)^2) 301 * 302 * Computed using Welfords method (see 303 * https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance, or Knuth, "The Art of 304 * Computer Programming", Vol. 2, page 323, 3rd edition) 305 */ 306 double deltaFromMean = value - mean; 307 mean += deltaFromMean / count; 308 double deltaFromMean2 = value - mean; 309 sumOfSquaredDeviations += deltaFromMean * deltaFromMean2; 310 311 if (value < min) { 312 min = value; 313 } 314 if (value > max) { 315 max = value; 316 } 317 318 int bucket = 0; 319 for (; bucket < bucketBoundaries.getBoundaries().size(); bucket++) { 320 if (value < bucketBoundaries.getBoundaries().get(bucket)) { 321 break; 322 } 323 } 324 bucketCounts[bucket]++; 325 326 // No implicit recording for exemplars - if there are no attachments (contextual information), 327 // don't record exemplars. 328 if (!attachments.isEmpty() && exemplars != null) { 329 exemplars[bucket] = Exemplar.create(value, timestamp, attachments); 330 } 331 } 332 333 // We don't compute fractional MutableDistribution, it's either whole or none. 334 @Override 335 void combine(MutableAggregation other, double fraction) { 336 checkArgument(other instanceof MutableDistribution, "MutableDistribution expected."); 337 if (Math.abs(1.0 - fraction) > TOLERANCE) { 338 return; 339 } 340 341 MutableDistribution mutableDistribution = (MutableDistribution) other; 342 checkArgument( 343 this.bucketBoundaries.equals(mutableDistribution.bucketBoundaries), 344 "Bucket boundaries should match."); 345 346 // Algorithm for calculating the combination of sum of squared deviations: 347 // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm. 348 if (this.count + mutableDistribution.count > 0) { 349 double delta = mutableDistribution.mean - this.mean; 350 this.sumOfSquaredDeviations = 351 this.sumOfSquaredDeviations 352 + mutableDistribution.sumOfSquaredDeviations 353 + Math.pow(delta, 2) 354 * this.count 355 * mutableDistribution.count 356 / (this.count + mutableDistribution.count); 357 } 358 359 this.count += mutableDistribution.count; 360 this.sum += mutableDistribution.sum; 361 this.mean = this.sum / this.count; 362 363 if (mutableDistribution.min < this.min) { 364 this.min = mutableDistribution.min; 365 } 366 if (mutableDistribution.max > this.max) { 367 this.max = mutableDistribution.max; 368 } 369 370 long[] bucketCounts = mutableDistribution.getBucketCounts(); 371 for (int i = 0; i < bucketCounts.length; i++) { 372 this.bucketCounts[i] += bucketCounts[i]; 373 } 374 375 Exemplar[] otherExemplars = mutableDistribution.getExemplars(); 376 if (exemplars != null && otherExemplars != null) { 377 for (int i = 0; i < otherExemplars.length; i++) { 378 Exemplar exemplar = otherExemplars[i]; 379 // Assume other is always newer than this, because we combined interval buckets in time 380 // order. 381 // If there's a newer exemplar, overwrite current value. 382 if (exemplar != null) { 383 this.exemplars[i] = exemplar; 384 } 385 } 386 } 387 } 388 389 @Override 390 AggregationData toAggregationData() { 391 List<Long> boxedBucketCounts = new ArrayList<Long>(); 392 for (long bucketCount : bucketCounts) { 393 boxedBucketCounts.add(bucketCount); 394 } 395 List<Exemplar> exemplarList = new ArrayList<Exemplar>(); 396 if (exemplars != null) { 397 for (Exemplar exemplar : exemplars) { 398 if (exemplar != null) { 399 exemplarList.add(exemplar); 400 } 401 } 402 } 403 return DistributionData.create( 404 mean, count, min, max, sumOfSquaredDeviations, boxedBucketCounts, exemplarList); 405 } 406 407 @Override 408 Point toPoint(Timestamp timestamp) { 409 List<Distribution.Bucket> buckets = new ArrayList<Distribution.Bucket>(); 410 for (int bucket = 0; bucket < bucketCounts.length; bucket++) { 411 long bucketCount = bucketCounts[bucket]; 412 @javax.annotation.Nullable AggregationData.DistributionData.Exemplar exemplar = null; 413 if (exemplars != null) { 414 exemplar = exemplars[bucket]; 415 } 416 417 Distribution.Bucket metricBucket; 418 if (exemplar != null) { 419 // Bucket with an Exemplar. 420 metricBucket = 421 Distribution.Bucket.create( 422 bucketCount, 423 Distribution.Exemplar.create( 424 exemplar.getValue(), exemplar.getTimestamp(), exemplar.getAttachments())); 425 } else { 426 // Bucket with no Exemplar. 427 metricBucket = Distribution.Bucket.create(bucketCount); 428 } 429 buckets.add(metricBucket); 430 } 431 432 // TODO(mayurkale): Drop the first bucket when converting to metrics. 433 // Reason: In Stats API, bucket bounds begin with -infinity (first bucket is (-infinity, 0)). 434 BucketOptions bucketOptions = BucketOptions.explicitOptions(bucketBoundaries.getBoundaries()); 435 436 return Point.create( 437 Value.distributionValue( 438 Distribution.create( 439 count, mean * count, sumOfSquaredDeviations, bucketOptions, buckets)), 440 timestamp); 441 } 442 443 double getMean() { 444 return mean; 445 } 446 447 long getCount() { 448 return count; 449 } 450 451 double getMin() { 452 return min; 453 } 454 455 double getMax() { 456 return max; 457 } 458 459 // Returns the aggregated sum of squared deviations. 460 double getSumOfSquaredDeviations() { 461 return sumOfSquaredDeviations; 462 } 463 464 long[] getBucketCounts() { 465 return bucketCounts; 466 } 467 468 BucketBoundaries getBucketBoundaries() { 469 return bucketBoundaries; 470 } 471 472 @javax.annotation.Nullable 473 Exemplar[] getExemplars() { 474 return exemplars; 475 } 476 } 477 478 /** Calculate double last value on aggregated {@code MeasureValue}s. */ 479 static class MutableLastValueDouble extends MutableAggregation { 480 481 // Initial value that will get reset as soon as first value is added. 482 private double lastValue = Double.NaN; 483 // TODO(songya): remove this once interval stats is completely removed. 484 private boolean initialized = false; 485 486 private MutableLastValueDouble() {} 487 488 /** 489 * Construct a {@code MutableLastValueDouble}. 490 * 491 * @return an empty {@code MutableLastValueDouble}. 492 */ 493 static MutableLastValueDouble create() { 494 return new MutableLastValueDouble(); 495 } 496 497 @Override 498 void add(double value, Map<String, String> attachments, Timestamp timestamp) { 499 lastValue = value; 500 // TODO(songya): remove this once interval stats is completely removed. 501 if (!initialized) { 502 initialized = true; 503 } 504 } 505 506 @Override 507 void combine(MutableAggregation other, double fraction) { 508 checkArgument(other instanceof MutableLastValueDouble, "MutableLastValueDouble expected."); 509 MutableLastValueDouble otherValue = (MutableLastValueDouble) other; 510 // Assume other is always newer than this, because we combined interval buckets in time order. 511 // If there's a newer value, overwrite current value. 512 this.lastValue = otherValue.initialized ? otherValue.getLastValue() : this.lastValue; 513 } 514 515 @Override 516 AggregationData toAggregationData() { 517 return AggregationData.LastValueDataDouble.create(lastValue); 518 } 519 520 @Override 521 Point toPoint(Timestamp timestamp) { 522 return Point.create(Value.doubleValue(lastValue), timestamp); 523 } 524 525 @VisibleForTesting 526 double getLastValue() { 527 return lastValue; 528 } 529 } 530 531 /** Calculate last long value on aggregated {@code MeasureValue}s. */ 532 static final class MutableLastValueLong extends MutableLastValueDouble { 533 private MutableLastValueLong() { 534 super(); 535 } 536 537 /** 538 * Construct a {@code MutableLastValueLong}. 539 * 540 * @return an empty {@code MutableLastValueLong}. 541 */ 542 static MutableLastValueLong create() { 543 return new MutableLastValueLong(); 544 } 545 546 @Override 547 AggregationData toAggregationData() { 548 return AggregationData.LastValueDataLong.create(Math.round(getLastValue())); 549 } 550 551 @Override 552 Point toPoint(Timestamp timestamp) { 553 return Point.create(Value.longValue(Math.round(getLastValue())), timestamp); 554 } 555 } 556 } 557