1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define DEBUG false // STOPSHIP if true 18 #include "Log.h" 19 20 #include "ValueMetricProducer.h" 21 #include "../guardrail/StatsdStats.h" 22 #include "../stats_log_util.h" 23 24 #include <cutils/log.h> 25 #include <limits.h> 26 #include <stdlib.h> 27 28 using android::util::FIELD_COUNT_REPEATED; 29 using android::util::FIELD_TYPE_BOOL; 30 using android::util::FIELD_TYPE_DOUBLE; 31 using android::util::FIELD_TYPE_INT32; 32 using android::util::FIELD_TYPE_INT64; 33 using android::util::FIELD_TYPE_MESSAGE; 34 using android::util::FIELD_TYPE_STRING; 35 using android::util::ProtoOutputStream; 36 using std::list; 37 using std::make_pair; 38 using std::make_shared; 39 using std::map; 40 using std::shared_ptr; 41 using std::unique_ptr; 42 using std::unordered_map; 43 44 namespace android { 45 namespace os { 46 namespace statsd { 47 48 // for StatsLogReport 49 const int FIELD_ID_ID = 1; 50 const int FIELD_ID_VALUE_METRICS = 7; 51 const int FIELD_ID_TIME_BASE = 9; 52 const int FIELD_ID_BUCKET_SIZE = 10; 53 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; 54 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12; 55 const int FIELD_ID_IS_ACTIVE = 14; 56 // for ValueMetricDataWrapper 57 const int FIELD_ID_DATA = 1; 58 const int FIELD_ID_SKIPPED = 2; 59 const int FIELD_ID_SKIPPED_START_MILLIS = 3; 60 const int FIELD_ID_SKIPPED_END_MILLIS = 4; 61 // for ValueMetricData 62 const int FIELD_ID_DIMENSION_IN_WHAT = 1; 63 const int FIELD_ID_DIMENSION_IN_CONDITION = 2; 64 const int FIELD_ID_BUCKET_INFO = 3; 65 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; 66 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; 67 // for ValueBucketInfo 68 const int FIELD_ID_VALUE_INDEX = 1; 69 const int FIELD_ID_VALUE_LONG = 2; 70 const int FIELD_ID_VALUE_DOUBLE = 3; 71 const int FIELD_ID_VALUES = 9; 72 const int FIELD_ID_BUCKET_NUM = 4; 73 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; 74 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; 75 const int FIELD_ID_CONDITION_TRUE_NS = 10; 76 77 const Value ZERO_LONG((int64_t)0); 78 const Value ZERO_DOUBLE((int64_t)0); 79 80 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently 81 ValueMetricProducer::ValueMetricProducer( 82 const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, 83 const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, 84 const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, 85 const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) 86 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard), 87 mWhatMatcherIndex(whatMatcherIndex), 88 mEventMatcherWizard(matcherWizard), 89 mPullerManager(pullerManager), 90 mPullTagId(pullTagId), 91 mIsPulled(pullTagId != -1), 92 mMinBucketSizeNs(metric.min_bucket_size_nanos()), 93 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 94 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 95 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first 96 : StatsdStats::kDimensionKeySizeSoftLimit), 97 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 98 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 99 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second 100 : StatsdStats::kDimensionKeySizeHardLimit), 101 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), 102 mAggregationType(metric.aggregation_type()), 103 mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), 104 mValueDirection(metric.value_direction()), 105 mSkipZeroDiffOutput(metric.skip_zero_diff_output()), 106 mUseZeroDefaultBase(metric.use_zero_default_base()), 107 mHasGlobalBase(false), 108 mCurrentBucketIsInvalid(false), 109 mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC 110 : StatsdStats::kPullMaxDelayNs), 111 mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), 112 // Condition timer will be set in prepareFirstBucketLocked. 113 mConditionTimer(false, timeBaseNs) { 114 int64_t bucketSizeMills = 0; 115 if (metric.has_bucket()) { 116 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); 117 } else { 118 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 119 } 120 121 mBucketSizeNs = bucketSizeMills * 1000000; 122 123 translateFieldMatcher(metric.value_field(), &mFieldMatchers); 124 125 if (metric.has_dimensions_in_what()) { 126 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); 127 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); 128 } 129 130 if (metric.has_dimensions_in_condition()) { 131 translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); 132 } 133 134 if (metric.links().size() > 0) { 135 for (const auto& link : metric.links()) { 136 Metric2Condition mc; 137 mc.conditionId = link.condition(); 138 translateFieldMatcher(link.fields_in_what(), &mc.metricFields); 139 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); 140 mMetric2ConditionLinks.push_back(mc); 141 } 142 } 143 144 mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); 145 mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || 146 HasPositionALL(metric.dimensions_in_condition()); 147 148 int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs); 149 mCurrentBucketNum += numBucketsForward; 150 151 flushIfNeededLocked(startTimeNs); 152 153 if (mIsPulled) { 154 mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), 155 mBucketSizeNs); 156 } 157 158 // Only do this for partial buckets like first bucket. All other buckets should use 159 // flushIfNeeded to adjust start and end to bucket boundaries. 160 // Adjust start for partial bucket 161 mCurrentBucketStartTimeNs = startTimeNs; 162 mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs); 163 VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), 164 (long long)mBucketSizeNs, (long long)mTimeBaseNs); 165 } 166 167 ValueMetricProducer::~ValueMetricProducer() { 168 VLOG("~ValueMetricProducer() called"); 169 if (mIsPulled) { 170 mPullerManager->UnRegisterReceiver(mPullTagId, this); 171 } 172 } 173 174 void ValueMetricProducer::prepareFirstBucketLocked() { 175 // Kicks off the puller immediately if condition is true and diff based. 176 if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) { 177 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition); 178 } 179 // Now that activations are processed, start the condition timer if needed. 180 mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue, 181 mCurrentBucketStartTimeNs); 182 } 183 184 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, 185 const int64_t eventTime) { 186 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); 187 } 188 189 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { 190 StatsdStats::getInstance().noteBucketDropped(mMetricId); 191 // We are going to flush the data without doing a pull first so we need to invalidte the data. 192 bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; 193 if (pullNeeded) { 194 invalidateCurrentBucket(); 195 } 196 flushIfNeededLocked(dropTimeNs); 197 clearPastBucketsLocked(dropTimeNs); 198 } 199 200 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { 201 mPastBuckets.clear(); 202 mSkippedBuckets.clear(); 203 } 204 205 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, 206 const bool include_current_partial_bucket, 207 const bool erase_data, 208 const DumpLatency dumpLatency, 209 std::set<string> *str_set, 210 ProtoOutputStream* protoOutput) { 211 VLOG("metric %lld dump report now...", (long long)mMetricId); 212 if (include_current_partial_bucket) { 213 // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the 214 // current bucket will have incomplete data and the next will have the wrong snapshot to do 215 // a diff against. If the condition is false, we are fine since the base data is reset and 216 // we are not tracking anything. 217 bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; 218 if (pullNeeded) { 219 switch (dumpLatency) { 220 case FAST: 221 invalidateCurrentBucket(); 222 break; 223 case NO_TIME_CONSTRAINTS: 224 pullAndMatchEventsLocked(dumpTimeNs, mCondition); 225 break; 226 } 227 } 228 flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs); 229 } 230 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 231 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); 232 233 if (mPastBuckets.empty() && mSkippedBuckets.empty()) { 234 return; 235 } 236 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs); 237 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs); 238 // Fills the dimension path if not slicing by ALL. 239 if (!mSliceByPositionALL) { 240 if (!mDimensionsInWhat.empty()) { 241 uint64_t dimenPathToken = 242 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); 243 writeDimensionPathToProto(mDimensionsInWhat, protoOutput); 244 protoOutput->end(dimenPathToken); 245 } 246 if (!mDimensionsInCondition.empty()) { 247 uint64_t dimenPathToken = 248 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); 249 writeDimensionPathToProto(mDimensionsInCondition, protoOutput); 250 protoOutput->end(dimenPathToken); 251 } 252 } 253 254 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); 255 256 for (const auto& pair : mSkippedBuckets) { 257 uint64_t wrapperToken = 258 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); 259 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, 260 (long long)(NanoToMillis(pair.first))); 261 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, 262 (long long)(NanoToMillis(pair.second))); 263 protoOutput->end(wrapperToken); 264 } 265 266 for (const auto& pair : mPastBuckets) { 267 const MetricDimensionKey& dimensionKey = pair.first; 268 VLOG(" dimension key %s", dimensionKey.toString().c_str()); 269 uint64_t wrapperToken = 270 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 271 272 // First fill dimension. 273 if (mSliceByPositionALL) { 274 uint64_t dimensionToken = 275 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 276 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); 277 protoOutput->end(dimensionToken); 278 if (dimensionKey.hasDimensionKeyInCondition()) { 279 uint64_t dimensionInConditionToken = 280 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); 281 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, 282 protoOutput); 283 protoOutput->end(dimensionInConditionToken); 284 } 285 } else { 286 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), 287 FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); 288 if (dimensionKey.hasDimensionKeyInCondition()) { 289 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), 290 FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, 291 protoOutput); 292 } 293 } 294 295 // Then fill bucket_info (ValueBucketInfo). 296 for (const auto& bucket : pair.second) { 297 uint64_t bucketInfoToken = protoOutput->start( 298 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 299 300 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) { 301 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS, 302 (long long)NanoToMillis(bucket.mBucketStartNs)); 303 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS, 304 (long long)NanoToMillis(bucket.mBucketEndNs)); 305 } else { 306 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, 307 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); 308 } 309 // only write the condition timer value if the metric has a condition. 310 if (mConditionTrackerIndex >= 0) { 311 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS, 312 (long long)bucket.mConditionTrueNs); 313 } 314 for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) { 315 int index = bucket.valueIndex[i]; 316 const Value& value = bucket.values[i]; 317 uint64_t valueToken = protoOutput->start( 318 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES); 319 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, 320 index); 321 if (value.getType() == LONG) { 322 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, 323 (long long)value.long_value); 324 VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs, 325 (long long)bucket.mBucketEndNs, index, (long long)value.long_value); 326 } else if (value.getType() == DOUBLE) { 327 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, 328 value.double_value); 329 VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs, 330 (long long)bucket.mBucketEndNs, index, value.double_value); 331 } else { 332 VLOG("Wrong value type for ValueMetric output: %d", value.getType()); 333 } 334 protoOutput->end(valueToken); 335 } 336 protoOutput->end(bucketInfoToken); 337 } 338 protoOutput->end(wrapperToken); 339 } 340 protoOutput->end(protoToken); 341 342 VLOG("metric %lld dump report now...", (long long)mMetricId); 343 if (erase_data) { 344 mPastBuckets.clear(); 345 mSkippedBuckets.clear(); 346 } 347 } 348 349 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() { 350 if (!mCurrentBucketIsInvalid) { 351 // Only report once per invalid bucket. 352 StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); 353 } 354 mCurrentBucketIsInvalid = true; 355 } 356 357 void ValueMetricProducer::invalidateCurrentBucket() { 358 invalidateCurrentBucketWithoutResetBase(); 359 resetBase(); 360 } 361 362 void ValueMetricProducer::resetBase() { 363 for (auto& slice : mCurrentSlicedBucket) { 364 for (auto& interval : slice.second) { 365 interval.hasBase = false; 366 } 367 } 368 mHasGlobalBase = false; 369 } 370 371 // Handle active state change. Active state change is treated like a condition change: 372 // - drop bucket if active state change event arrives too late 373 // - if condition is true, pull data on active state changes 374 // - ConditionTimer tracks changes based on AND of condition and active state. 375 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) { 376 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; 377 if (ConditionState::kTrue == mCondition && isEventTooLate) { 378 // Drop bucket because event arrived too late, ie. we are missing data for this bucket. 379 invalidateCurrentBucket(); 380 } 381 382 // Call parent method once we've verified the validity of current bucket. 383 MetricProducer::onActiveStateChangedLocked(eventTimeNs); 384 385 if (ConditionState::kTrue != mCondition) { 386 return; 387 } 388 389 // Pull on active state changes. 390 if (!isEventTooLate) { 391 if (mIsPulled) { 392 pullAndMatchEventsLocked(eventTimeNs, mCondition); 393 } 394 // When active state changes from true to false, clear diff base but don't 395 // reset other counters as we may accumulate more value in the bucket. 396 if (mUseDiff && !mIsActive) { 397 resetBase(); 398 } 399 } 400 401 flushIfNeededLocked(eventTimeNs); 402 403 // Let condition timer know of new active state. 404 mConditionTimer.onConditionChanged(mIsActive, eventTimeNs); 405 } 406 407 void ValueMetricProducer::onConditionChangedLocked(const bool condition, 408 const int64_t eventTimeNs) { 409 ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; 410 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; 411 412 if (mIsActive) { 413 if (isEventTooLate) { 414 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 415 (long long)mCurrentBucketStartTimeNs); 416 StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); 417 invalidateCurrentBucket(); 418 } else { 419 if (mCondition == ConditionState::kUnknown) { 420 // If the condition was unknown, we mark the bucket as invalid since the bucket will 421 // contain partial data. For instance, the condition change might happen close to 422 // the end of the bucket and we might miss lots of data. 423 // 424 // We still want to pull to set the base. 425 invalidateCurrentBucket(); 426 } 427 428 // Pull on condition changes. 429 bool conditionChanged = 430 (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse) 431 || (mCondition == ConditionState::kFalse && 432 newCondition == ConditionState::kTrue); 433 // We do not need to pull when we go from unknown to false. 434 // 435 // We also pull if the condition was already true in order to be able to flush the 436 // bucket at the end if needed. 437 // 438 // onConditionChangedLocked might happen on bucket boundaries if this is called before 439 // #onDataPulled. 440 if (mIsPulled && (conditionChanged || condition)) { 441 pullAndMatchEventsLocked(eventTimeNs, newCondition); 442 } 443 444 // When condition change from true to false, clear diff base but don't 445 // reset other counters as we may accumulate more value in the bucket. 446 if (mUseDiff && mCondition == ConditionState::kTrue 447 && newCondition == ConditionState::kFalse) { 448 resetBase(); 449 } 450 } 451 } 452 453 mCondition = isEventTooLate ? initialCondition(mConditionTrackerIndex) : newCondition; 454 455 if (mIsActive) { 456 flushIfNeededLocked(eventTimeNs); 457 mConditionTimer.onConditionChanged(mCondition, eventTimeNs); 458 } 459 } 460 461 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs, 462 ConditionState condition) { 463 vector<std::shared_ptr<LogEvent>> allData; 464 if (!mPullerManager->Pull(mPullTagId, &allData)) { 465 ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); 466 invalidateCurrentBucket(); 467 return; 468 } 469 470 accumulateEvents(allData, timestampNs, timestampNs, condition); 471 } 472 473 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { 474 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; 475 } 476 477 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely 478 // to be delayed. Other events like condition changes or app upgrade which are not based on 479 // AlarmManager might have arrived earlier and close the bucket. 480 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, 481 bool pullSuccess, int64_t originalPullTimeNs) { 482 std::lock_guard<std::mutex> lock(mMutex); 483 if (mCondition == ConditionState::kTrue) { 484 // If the pull failed, we won't be able to compute a diff. 485 if (!pullSuccess) { 486 invalidateCurrentBucket(); 487 } else { 488 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); 489 if (isEventLate) { 490 // If the event is late, we are in the middle of a bucket. Just 491 // process the data without trying to snap the data to the nearest bucket. 492 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition); 493 } else { 494 // For scheduled pulled data, the effective event time is snap to the nearest 495 // bucket end. In the case of waking up from a deep sleep state, we will 496 // attribute to the previous bucket end. If the sleep was long but not very 497 // long, we will be in the immediate next bucket. Previous bucket may get a 498 // larger number as we pull at a later time than real bucket end. 499 // 500 // If the sleep was very long, we skip more than one bucket before sleep. In 501 // this case, if the diff base will be cleared and this new data will serve as 502 // new diff base. 503 int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; 504 StatsdStats::getInstance().noteBucketBoundaryDelayNs( 505 mMetricId, originalPullTimeNs - bucketEndTime); 506 accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition); 507 } 508 } 509 } 510 511 // We can probably flush the bucket. Since we used bucketEndTime when calling 512 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. 513 flushIfNeededLocked(originalPullTimeNs); 514 } 515 516 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, 517 int64_t originalPullTimeNs, int64_t eventElapsedTimeNs, 518 ConditionState condition) { 519 bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; 520 if (isEventLate) { 521 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", 522 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); 523 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); 524 invalidateCurrentBucket(); 525 return; 526 } 527 528 const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs; 529 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); 530 if (pullDelayNs > mMaxPullDelayNs) { 531 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, 532 (long long)mMaxPullDelayNs); 533 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); 534 // We are missing one pull from the bucket which means we will not have a complete view of 535 // what's going on. 536 invalidateCurrentBucket(); 537 return; 538 } 539 540 if (allData.size() == 0) { 541 VLOG("Data pulled is empty"); 542 StatsdStats::getInstance().noteEmptyData(mPullTagId); 543 } 544 545 mMatchedMetricDimensionKeys.clear(); 546 for (const auto& data : allData) { 547 LogEvent localCopy = data->makeCopy(); 548 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == 549 MatchingState::kMatched) { 550 localCopy.setElapsedTimestampNs(eventElapsedTimeNs); 551 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); 552 } 553 } 554 // If the new pulled data does not contains some keys we track in our intervals, we need to 555 // reset the base. 556 for (auto& slice : mCurrentSlicedBucket) { 557 bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) 558 != mMatchedMetricDimensionKeys.end(); 559 if (!presentInPulledData) { 560 for (auto& interval : slice.second) { 561 interval.hasBase = false; 562 } 563 } 564 } 565 mMatchedMetricDimensionKeys.clear(); 566 mHasGlobalBase = true; 567 568 // If we reach the guardrail, we might have dropped some data which means the bucket is 569 // incomplete. 570 // 571 // The base also needs to be reset. If we do not have the full data, we might 572 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key 573 // might be missing from mCurrentSlicedBucket. 574 if (hasReachedGuardRailLimit()) { 575 invalidateCurrentBucket(); 576 mCurrentSlicedBucket.clear(); 577 } 578 } 579 580 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { 581 if (mCurrentSlicedBucket.size() == 0) { 582 return; 583 } 584 585 fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId, 586 (unsigned long)mCurrentSlicedBucket.size()); 587 if (verbose) { 588 for (const auto& it : mCurrentSlicedBucket) { 589 for (const auto& interval : it.second) { 590 fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n", 591 it.first.getDimensionKeyInWhat().toString().c_str(), 592 it.first.getDimensionKeyInCondition().toString().c_str(), 593 interval.value.toString().c_str()); 594 } 595 } 596 } 597 } 598 599 bool ValueMetricProducer::hasReachedGuardRailLimit() const { 600 return mCurrentSlicedBucket.size() >= mDimensionHardLimit; 601 } 602 603 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { 604 // ===========GuardRail============== 605 // 1. Report the tuple count if the tuple count > soft limit 606 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) { 607 return false; 608 } 609 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) { 610 size_t newTupleCount = mCurrentSlicedBucket.size() + 1; 611 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 612 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 613 if (hasReachedGuardRailLimit()) { 614 ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, 615 newKey.toString().c_str()); 616 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId); 617 return true; 618 } 619 } 620 621 return false; 622 } 623 624 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) { 625 // ===========GuardRail============== 626 // 1. Report the tuple count if the tuple count > soft limit 627 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) { 628 return false; 629 } 630 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) { 631 size_t newTupleCount = mCurrentFullBucket.size() + 1; 632 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 633 if (newTupleCount > mDimensionHardLimit) { 634 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s", 635 (long long)mMetricId, 636 newKey.toString().c_str()); 637 return true; 638 } 639 } 640 641 return false; 642 } 643 644 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) { 645 for (const FieldValue& value : event.getValues()) { 646 if (value.mField.matches(matcher)) { 647 switch (value.mValue.type) { 648 case INT: 649 ret.setLong(value.mValue.int_value); 650 break; 651 case LONG: 652 ret.setLong(value.mValue.long_value); 653 break; 654 case FLOAT: 655 ret.setDouble(value.mValue.float_value); 656 break; 657 case DOUBLE: 658 ret.setDouble(value.mValue.double_value); 659 break; 660 default: 661 break; 662 } 663 return true; 664 } 665 } 666 return false; 667 } 668 669 void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, 670 const MetricDimensionKey& eventKey, 671 const ConditionKey& conditionKey, 672 bool condition, const LogEvent& event) { 673 int64_t eventTimeNs = event.GetElapsedTimestampNs(); 674 if (eventTimeNs < mCurrentBucketStartTimeNs) { 675 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 676 (long long)mCurrentBucketStartTimeNs); 677 return; 678 } 679 mMatchedMetricDimensionKeys.insert(eventKey); 680 681 if (!mIsPulled) { 682 // We cannot flush without doing a pull first. 683 flushIfNeededLocked(eventTimeNs); 684 } 685 686 // We should not accumulate the data for pushed metrics when the condition is false. 687 bool shouldSkipForPushMetric = !mIsPulled && !condition; 688 // For pulled metrics, there are two cases: 689 // - to compute diffs, we need to process all the state changes 690 // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a 691 // state change from 692 // + True -> True: we should process the data, it might be a bucket boundary 693 // + True -> False: we als need to process the data. 694 bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff 695 && mCondition != ConditionState::kTrue; 696 if (shouldSkipForPushMetric || shouldSkipForPulledMetric) { 697 VLOG("ValueMetric skip event because condition is false"); 698 return; 699 } 700 701 if (hitGuardRailLocked(eventKey)) { 702 return; 703 } 704 vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey]; 705 if (multiIntervals.size() < mFieldMatchers.size()) { 706 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size()); 707 multiIntervals.resize(mFieldMatchers.size()); 708 } 709 710 // We only use anomaly detection under certain cases. 711 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics 712 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the 713 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner. 714 // Whoever next works on it should look into the cases where it is triggered in this function. 715 // Discussion here: http://ag/6124370. 716 bool useAnomalyDetection = true; 717 718 for (int i = 0; i < (int)mFieldMatchers.size(); i++) { 719 const Matcher& matcher = mFieldMatchers[i]; 720 Interval& interval = multiIntervals[i]; 721 interval.valueIndex = i; 722 Value value; 723 if (!getDoubleOrLong(event, matcher, value)) { 724 VLOG("Failed to get value %d from event %s", i, event.ToString().c_str()); 725 StatsdStats::getInstance().noteBadValueType(mMetricId); 726 return; 727 } 728 interval.seenNewData = true; 729 730 if (mUseDiff) { 731 if (!interval.hasBase) { 732 if (mHasGlobalBase && mUseZeroDefaultBase) { 733 // The bucket has global base. This key does not. 734 // Optionally use zero as base. 735 interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); 736 interval.hasBase = true; 737 } else { 738 // no base. just update base and return. 739 interval.base = value; 740 interval.hasBase = true; 741 // If we're missing a base, do not use anomaly detection on incomplete data 742 useAnomalyDetection = false; 743 // Continue (instead of return) here in order to set interval.base and 744 // interval.hasBase for other intervals 745 continue; 746 } 747 } 748 Value diff; 749 switch (mValueDirection) { 750 case ValueMetric::INCREASING: 751 if (value >= interval.base) { 752 diff = value - interval.base; 753 } else if (mUseAbsoluteValueOnReset) { 754 diff = value; 755 } else { 756 VLOG("Unexpected decreasing value"); 757 StatsdStats::getInstance().notePullDataError(mPullTagId); 758 interval.base = value; 759 // If we've got bad data, do not use anomaly detection 760 useAnomalyDetection = false; 761 continue; 762 } 763 break; 764 case ValueMetric::DECREASING: 765 if (interval.base >= value) { 766 diff = interval.base - value; 767 } else if (mUseAbsoluteValueOnReset) { 768 diff = value; 769 } else { 770 VLOG("Unexpected increasing value"); 771 StatsdStats::getInstance().notePullDataError(mPullTagId); 772 interval.base = value; 773 // If we've got bad data, do not use anomaly detection 774 useAnomalyDetection = false; 775 continue; 776 } 777 break; 778 case ValueMetric::ANY: 779 diff = value - interval.base; 780 break; 781 default: 782 break; 783 } 784 interval.base = value; 785 value = diff; 786 } 787 788 if (interval.hasValue) { 789 switch (mAggregationType) { 790 case ValueMetric::SUM: 791 // for AVG, we add up and take average when flushing the bucket 792 case ValueMetric::AVG: 793 interval.value += value; 794 break; 795 case ValueMetric::MIN: 796 interval.value = std::min(value, interval.value); 797 break; 798 case ValueMetric::MAX: 799 interval.value = std::max(value, interval.value); 800 break; 801 default: 802 break; 803 } 804 } else { 805 interval.value = value; 806 interval.hasValue = true; 807 } 808 interval.sampleSize += 1; 809 } 810 811 // Only trigger the tracker if all intervals are correct 812 if (useAnomalyDetection) { 813 // TODO: propgate proper values down stream when anomaly support doubles 814 long wholeBucketVal = multiIntervals[0].value.long_value; 815 auto prev = mCurrentFullBucket.find(eventKey); 816 if (prev != mCurrentFullBucket.end()) { 817 wholeBucketVal += prev->second; 818 } 819 for (auto& tracker : mAnomalyTrackers) { 820 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey, 821 wholeBucketVal); 822 } 823 } 824 } 825 826 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket 827 // if mCondition is true! 828 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { 829 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 830 if (eventTimeNs < currentBucketEndTimeNs) { 831 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, 832 (long long)(currentBucketEndTimeNs)); 833 return; 834 } 835 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); 836 int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; 837 flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs); 838 } 839 840 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const { 841 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 842 if (eventTimeNs < currentBucketEndTimeNs) { 843 return 0; 844 } 845 return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; 846 } 847 848 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, 849 const int64_t& nextBucketStartTimeNs) { 850 if (mCondition == ConditionState::kUnknown) { 851 StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); 852 } 853 854 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); 855 if (numBucketsForward > 1) { 856 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); 857 StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); 858 // Something went wrong. Maybe the device was sleeping for a long time. It is better 859 // to mark the current bucket as invalid. The last pull might have been successful through. 860 invalidateCurrentBucketWithoutResetBase(); 861 } 862 863 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, 864 (int)mCurrentSlicedBucket.size()); 865 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); 866 int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; 867 // Close the current bucket. 868 int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); 869 bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; 870 if (isBucketLargeEnough && !mCurrentBucketIsInvalid) { 871 // The current bucket is large enough to keep. 872 for (const auto& slice : mCurrentSlicedBucket) { 873 ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); 874 bucket.mConditionTrueNs = conditionTrueDuration; 875 // it will auto create new vector of ValuebucketInfo if the key is not found. 876 if (bucket.valueIndex.size() > 0) { 877 auto& bucketList = mPastBuckets[slice.first]; 878 bucketList.push_back(bucket); 879 } 880 } 881 } else { 882 mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); 883 } 884 885 appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); 886 initCurrentSlicedBucket(nextBucketStartTimeNs); 887 // Update the condition timer again, in case we skipped buckets. 888 mConditionTimer.newBucketStart(nextBucketStartTimeNs); 889 mCurrentBucketNum += numBucketsForward; 890 } 891 892 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, 893 const std::vector<Interval>& intervals) { 894 ValueBucket bucket; 895 bucket.mBucketStartNs = mCurrentBucketStartTimeNs; 896 bucket.mBucketEndNs = bucketEndTime; 897 for (const auto& interval : intervals) { 898 if (interval.hasValue) { 899 // skip the output if the diff is zero 900 if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) { 901 continue; 902 } 903 bucket.valueIndex.push_back(interval.valueIndex); 904 if (mAggregationType != ValueMetric::AVG) { 905 bucket.values.push_back(interval.value); 906 } else { 907 double sum = interval.value.type == LONG ? (double)interval.value.long_value 908 : interval.value.double_value; 909 bucket.values.push_back(Value((double)sum / interval.sampleSize)); 910 } 911 } 912 } 913 return bucket; 914 } 915 916 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) { 917 StatsdStats::getInstance().noteBucketCount(mMetricId); 918 // Cleanup data structure to aggregate values. 919 for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { 920 bool obsolete = true; 921 for (auto& interval : it->second) { 922 interval.hasValue = false; 923 interval.sampleSize = 0; 924 if (interval.seenNewData) { 925 obsolete = false; 926 } 927 interval.seenNewData = false; 928 } 929 930 if (obsolete) { 931 it = mCurrentSlicedBucket.erase(it); 932 } else { 933 it++; 934 } 935 } 936 937 mCurrentBucketIsInvalid = false; 938 // If we do not have a global base when the condition is true, 939 // we will have incomplete bucket for the next bucket. 940 if (mUseDiff && !mHasGlobalBase && mCondition) { 941 mCurrentBucketIsInvalid = false; 942 } 943 mCurrentBucketStartTimeNs = nextBucketStartTimeNs; 944 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, 945 (long long)mCurrentBucketStartTimeNs); 946 } 947 948 void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { 949 bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; 950 if (mCurrentBucketIsInvalid) { 951 if (isFullBucketReached) { 952 // If the bucket is invalid, we ignore the full bucket since it contains invalid data. 953 mCurrentFullBucket.clear(); 954 } 955 // Current bucket is invalid, we do not add it to the full bucket. 956 return; 957 } 958 959 if (isFullBucketReached) { // If full bucket, send to anomaly tracker. 960 // Accumulate partial buckets with current value and then send to anomaly tracker. 961 if (mCurrentFullBucket.size() > 0) { 962 for (const auto& slice : mCurrentSlicedBucket) { 963 if (hitFullBucketGuardRailLocked(slice.first)) { 964 continue; 965 } 966 // TODO: fix this when anomaly can accept double values 967 auto& interval = slice.second[0]; 968 if (interval.hasValue) { 969 mCurrentFullBucket[slice.first] += interval.value.long_value; 970 } 971 } 972 for (const auto& slice : mCurrentFullBucket) { 973 for (auto& tracker : mAnomalyTrackers) { 974 if (tracker != nullptr) { 975 tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum); 976 } 977 } 978 } 979 mCurrentFullBucket.clear(); 980 } else { 981 // Skip aggregating the partial buckets since there's no previous partial bucket. 982 for (const auto& slice : mCurrentSlicedBucket) { 983 for (auto& tracker : mAnomalyTrackers) { 984 if (tracker != nullptr) { 985 // TODO: fix this when anomaly can accept double values 986 auto& interval = slice.second[0]; 987 if (interval.hasValue) { 988 tracker->addPastBucket(slice.first, interval.value.long_value, 989 mCurrentBucketNum); 990 } 991 } 992 } 993 } 994 } 995 } else { 996 // Accumulate partial bucket. 997 for (const auto& slice : mCurrentSlicedBucket) { 998 // TODO: fix this when anomaly can accept double values 999 auto& interval = slice.second[0]; 1000 if (interval.hasValue) { 1001 mCurrentFullBucket[slice.first] += interval.value.long_value; 1002 } 1003 } 1004 } 1005 } 1006 1007 size_t ValueMetricProducer::byteSizeLocked() const { 1008 size_t totalSize = 0; 1009 for (const auto& pair : mPastBuckets) { 1010 totalSize += pair.second.size() * kBucketSize; 1011 } 1012 return totalSize; 1013 } 1014 1015 } // namespace statsd 1016 } // namespace os 1017 } // namespace android 1018