1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define DEBUG false // STOPSHIP if true 18 #include "Log.h" 19 20 #include "../guardrail/StatsdStats.h" 21 #include "GaugeMetricProducer.h" 22 #include "../stats_log_util.h" 23 24 #include <cutils/log.h> 25 26 using android::util::FIELD_COUNT_REPEATED; 27 using android::util::FIELD_TYPE_BOOL; 28 using android::util::FIELD_TYPE_FLOAT; 29 using android::util::FIELD_TYPE_INT32; 30 using android::util::FIELD_TYPE_INT64; 31 using android::util::FIELD_TYPE_MESSAGE; 32 using android::util::FIELD_TYPE_STRING; 33 using android::util::ProtoOutputStream; 34 using std::map; 35 using std::string; 36 using std::unordered_map; 37 using std::vector; 38 using std::make_shared; 39 using std::shared_ptr; 40 41 namespace android { 42 namespace os { 43 namespace statsd { 44 45 // for StatsLogReport 46 const int FIELD_ID_ID = 1; 47 const int FIELD_ID_GAUGE_METRICS = 8; 48 const int FIELD_ID_TIME_BASE = 9; 49 const int FIELD_ID_BUCKET_SIZE = 10; 50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; 51 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12; 52 const int FIELD_ID_IS_ACTIVE = 14; 53 // for GaugeMetricDataWrapper 54 const int FIELD_ID_DATA = 1; 55 const int FIELD_ID_SKIPPED = 2; 56 const int FIELD_ID_SKIPPED_START_MILLIS = 3; 57 const int FIELD_ID_SKIPPED_END_MILLIS = 4; 58 // for GaugeMetricData 59 const int FIELD_ID_DIMENSION_IN_WHAT = 1; 60 const int FIELD_ID_DIMENSION_IN_CONDITION = 2; 61 const int FIELD_ID_BUCKET_INFO = 3; 62 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; 63 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; 64 // for GaugeBucketInfo 65 const int FIELD_ID_ATOM = 3; 66 const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4; 67 const int FIELD_ID_BUCKET_NUM = 6; 68 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7; 69 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8; 70 71 GaugeMetricProducer::GaugeMetricProducer( 72 const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex, 73 const sp<ConditionWizard>& wizard, const int whatMatcherIndex, 74 const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId, 75 const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs, 76 const sp<StatsPullerManager>& pullerManager) 77 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), 78 mWhatMatcherIndex(whatMatcherIndex), 79 mEventMatcherWizard(matcherWizard), 80 mPullerManager(pullerManager), 81 mPullTagId(pullTagId), 82 mTriggerAtomId(triggerAtomId), 83 mAtomId(atomId), 84 mIsPulled(pullTagId != -1), 85 mMinBucketSizeNs(metric.min_bucket_size_nanos()), 86 mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC 87 : StatsdStats::kPullMaxDelayNs), 88 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 89 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 90 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first 91 : StatsdStats::kDimensionKeySizeSoftLimit), 92 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 93 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 94 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second 95 : StatsdStats::kDimensionKeySizeHardLimit), 96 mGaugeAtomsPerDimensionLimit(metric.max_num_gauge_atoms_per_bucket()), 97 mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()) { 98 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 99 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 100 int64_t bucketSizeMills = 0; 101 if (metric.has_bucket()) { 102 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); 103 } else { 104 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 105 } 106 mBucketSizeNs = bucketSizeMills * 1000000; 107 108 mSamplingType = metric.sampling_type(); 109 if (!metric.gauge_fields_filter().include_all()) { 110 translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers); 111 } 112 113 if (metric.has_dimensions_in_what()) { 114 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); 115 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); 116 } 117 118 if (metric.has_dimensions_in_condition()) { 119 translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); 120 } 121 122 if (metric.links().size() > 0) { 123 for (const auto& link : metric.links()) { 124 Metric2Condition mc; 125 mc.conditionId = link.condition(); 126 translateFieldMatcher(link.fields_in_what(), &mc.metricFields); 127 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); 128 mMetric2ConditionLinks.push_back(mc); 129 } 130 } 131 mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); 132 mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || 133 HasPositionALL(metric.dimensions_in_condition()); 134 135 flushIfNeededLocked(startTimeNs); 136 // Kicks off the puller immediately. 137 if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 138 mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), 139 mBucketSizeNs); 140 } 141 142 // Adjust start for partial bucket 143 mCurrentBucketStartTimeNs = startTimeNs; 144 145 VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d", 146 (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs, 147 mConditionSliced); 148 } 149 150 GaugeMetricProducer::~GaugeMetricProducer() { 151 VLOG("~GaugeMetricProducer() called"); 152 if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 153 mPullerManager->UnRegisterReceiver(mPullTagId, this); 154 } 155 } 156 157 void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { 158 if (mCurrentSlicedBucket == nullptr || 159 mCurrentSlicedBucket->size() == 0) { 160 return; 161 } 162 163 fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId, 164 (unsigned long)mCurrentSlicedBucket->size()); 165 if (verbose) { 166 for (const auto& it : *mCurrentSlicedBucket) { 167 fprintf(out, "\t(what)%s\t(condition)%s %d atoms\n", 168 it.first.getDimensionKeyInWhat().toString().c_str(), 169 it.first.getDimensionKeyInCondition().toString().c_str(), 170 (int)it.second.size()); 171 } 172 } 173 } 174 175 void GaugeMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { 176 flushIfNeededLocked(dumpTimeNs); 177 mPastBuckets.clear(); 178 mSkippedBuckets.clear(); 179 } 180 181 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, 182 const bool include_current_partial_bucket, 183 const bool erase_data, 184 const DumpLatency dumpLatency, 185 std::set<string> *str_set, 186 ProtoOutputStream* protoOutput) { 187 VLOG("Gauge metric %lld report now...", (long long)mMetricId); 188 if (include_current_partial_bucket) { 189 flushLocked(dumpTimeNs); 190 } else { 191 flushIfNeededLocked(dumpTimeNs); 192 } 193 194 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 195 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); 196 197 if (mPastBuckets.empty()) { 198 return; 199 } 200 201 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs); 202 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs); 203 204 // Fills the dimension path if not slicing by ALL. 205 if (!mSliceByPositionALL) { 206 if (!mDimensionsInWhat.empty()) { 207 uint64_t dimenPathToken = protoOutput->start( 208 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); 209 writeDimensionPathToProto(mDimensionsInWhat, protoOutput); 210 protoOutput->end(dimenPathToken); 211 } 212 if (!mDimensionsInCondition.empty()) { 213 uint64_t dimenPathToken = protoOutput->start( 214 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); 215 writeDimensionPathToProto(mDimensionsInCondition, protoOutput); 216 protoOutput->end(dimenPathToken); 217 } 218 } 219 220 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 221 222 for (const auto& pair : mSkippedBuckets) { 223 uint64_t wrapperToken = 224 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); 225 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, 226 (long long)(NanoToMillis(pair.first))); 227 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, 228 (long long)(NanoToMillis(pair.second))); 229 protoOutput->end(wrapperToken); 230 } 231 232 for (const auto& pair : mPastBuckets) { 233 const MetricDimensionKey& dimensionKey = pair.first; 234 235 VLOG("Gauge dimension key %s", dimensionKey.toString().c_str()); 236 uint64_t wrapperToken = 237 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 238 239 // First fill dimension. 240 if (mSliceByPositionALL) { 241 uint64_t dimensionToken = protoOutput->start( 242 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 243 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); 244 protoOutput->end(dimensionToken); 245 246 if (dimensionKey.hasDimensionKeyInCondition()) { 247 uint64_t dimensionInConditionToken = protoOutput->start( 248 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); 249 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), 250 str_set, protoOutput); 251 protoOutput->end(dimensionInConditionToken); 252 } 253 } else { 254 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), 255 FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); 256 if (dimensionKey.hasDimensionKeyInCondition()) { 257 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), 258 FIELD_ID_DIMENSION_LEAF_IN_CONDITION, 259 str_set, protoOutput); 260 } 261 } 262 263 // Then fill bucket_info (GaugeBucketInfo). 264 for (const auto& bucket : pair.second) { 265 uint64_t bucketInfoToken = protoOutput->start( 266 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 267 268 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) { 269 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS, 270 (long long)NanoToMillis(bucket.mBucketStartNs)); 271 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS, 272 (long long)NanoToMillis(bucket.mBucketEndNs)); 273 } else { 274 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, 275 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); 276 } 277 278 if (!bucket.mGaugeAtoms.empty()) { 279 for (const auto& atom : bucket.mGaugeAtoms) { 280 uint64_t atomsToken = 281 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | 282 FIELD_ID_ATOM); 283 writeFieldValueTreeToStream(mAtomId, *(atom.mFields), protoOutput); 284 protoOutput->end(atomsToken); 285 } 286 for (const auto& atom : bucket.mGaugeAtoms) { 287 const int64_t elapsedTimestampNs = 288 truncateTimestampIfNecessary(mAtomId, atom.mElapsedTimestamps); 289 protoOutput->write( 290 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP, 291 (long long)elapsedTimestampNs); 292 } 293 } 294 protoOutput->end(bucketInfoToken); 295 VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.", 296 (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, 297 (int)bucket.mGaugeAtoms.size()); 298 } 299 protoOutput->end(wrapperToken); 300 } 301 protoOutput->end(protoToken); 302 303 304 if (erase_data) { 305 mPastBuckets.clear(); 306 mSkippedBuckets.clear(); 307 } 308 } 309 310 void GaugeMetricProducer::prepareFirstBucketLocked() { 311 if (mIsActive && mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 312 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs); 313 } 314 } 315 316 void GaugeMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { 317 bool triggerPuller = false; 318 switch(mSamplingType) { 319 // When the metric wants to do random sampling and there is already one gauge atom for the 320 // current bucket, do not do it again. 321 case GaugeMetric::RANDOM_ONE_SAMPLE: { 322 triggerPuller = mCondition == ConditionState::kTrue && mCurrentSlicedBucket->empty(); 323 break; 324 } 325 case GaugeMetric::CONDITION_CHANGE_TO_TRUE: { 326 triggerPuller = mCondition == ConditionState::kTrue; 327 break; 328 } 329 case GaugeMetric::FIRST_N_SAMPLES: { 330 triggerPuller = mCondition == ConditionState::kTrue; 331 break; 332 } 333 default: 334 break; 335 } 336 if (!triggerPuller) { 337 return; 338 } 339 vector<std::shared_ptr<LogEvent>> allData; 340 if (!mPullerManager->Pull(mPullTagId, &allData)) { 341 ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); 342 return; 343 } 344 const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs; 345 if (pullDelayNs > mMaxPullDelayNs) { 346 ALOGE("Pull finish too late for atom %d", mPullTagId); 347 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); 348 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); 349 return; 350 } 351 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); 352 for (const auto& data : allData) { 353 LogEvent localCopy = data->makeCopy(); 354 localCopy.setElapsedTimestampNs(timestampNs); 355 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == 356 MatchingState::kMatched) { 357 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); 358 } 359 } 360 } 361 362 void GaugeMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) { 363 MetricProducer::onActiveStateChangedLocked(eventTimeNs); 364 if (ConditionState::kTrue != mCondition || !mIsPulled) { 365 return; 366 } 367 if (mTriggerAtomId == -1 || (mIsActive && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE)) { 368 pullAndMatchEventsLocked(eventTimeNs); 369 } 370 371 } 372 373 void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, 374 const int64_t eventTimeNs) { 375 VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId); 376 377 mCondition = conditionMet ? ConditionState::kTrue : ConditionState::kFalse; 378 if (!mIsActive) { 379 return; 380 } 381 382 flushIfNeededLocked(eventTimeNs); 383 if (mIsPulled && mTriggerAtomId == -1) { 384 pullAndMatchEventsLocked(eventTimeNs); 385 } // else: Push mode. No need to proactively pull the gauge data. 386 } 387 388 void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, 389 const int64_t eventTimeNs) { 390 VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId, 391 overallCondition); 392 mCondition = overallCondition ? ConditionState::kTrue : ConditionState::kFalse; 393 if (!mIsActive) { 394 return; 395 } 396 397 flushIfNeededLocked(eventTimeNs); 398 // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will 399 // pull for every dimension. 400 if (mIsPulled && mTriggerAtomId == -1) { 401 pullAndMatchEventsLocked(eventTimeNs); 402 } // else: Push mode. No need to proactively pull the gauge data. 403 } 404 405 std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) { 406 std::shared_ptr<vector<FieldValue>> gaugeFields; 407 if (mFieldMatchers.size() > 0) { 408 gaugeFields = std::make_shared<vector<FieldValue>>(); 409 filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get()); 410 } else { 411 gaugeFields = std::make_shared<vector<FieldValue>>(event.getValues()); 412 } 413 // Trim all dimension fields from output. Dimensions will appear in output report and will 414 // benefit from dictionary encoding. For large pulled atoms, this can give the benefit of 415 // optional repeated field. 416 for (const auto& field : mDimensionsInWhat) { 417 for (auto it = gaugeFields->begin(); it != gaugeFields->end();) { 418 if (it->mField.matches(field)) { 419 it = gaugeFields->erase(it); 420 } else { 421 it++; 422 } 423 } 424 } 425 return gaugeFields; 426 } 427 428 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, 429 bool pullSuccess, int64_t originalPullTimeNs) { 430 std::lock_guard<std::mutex> lock(mMutex); 431 if (!pullSuccess || allData.size() == 0) { 432 return; 433 } 434 for (const auto& data : allData) { 435 if (mEventMatcherWizard->matchLogEvent( 436 *data, mWhatMatcherIndex) == MatchingState::kMatched) { 437 onMatchedLogEventLocked(mWhatMatcherIndex, *data); 438 } 439 } 440 } 441 442 bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { 443 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 444 return false; 445 } 446 // 1. Report the tuple count if the tuple count > soft limit 447 if (mCurrentSlicedBucket->size() > mDimensionSoftLimit - 1) { 448 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 449 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 450 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 451 if (newTupleCount > mDimensionHardLimit) { 452 ALOGE("GaugeMetric %lld dropping data for dimension key %s", 453 (long long)mMetricId, newKey.toString().c_str()); 454 return true; 455 } 456 } 457 458 return false; 459 } 460 461 void GaugeMetricProducer::onMatchedLogEventInternalLocked( 462 const size_t matcherIndex, const MetricDimensionKey& eventKey, 463 const ConditionKey& conditionKey, bool condition, 464 const LogEvent& event) { 465 if (condition == false) { 466 return; 467 } 468 int64_t eventTimeNs = event.GetElapsedTimestampNs(); 469 if (eventTimeNs < mCurrentBucketStartTimeNs) { 470 VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 471 (long long)mCurrentBucketStartTimeNs); 472 return; 473 } 474 flushIfNeededLocked(eventTimeNs); 475 476 if (mTriggerAtomId == event.GetTagId()) { 477 pullAndMatchEventsLocked(eventTimeNs); 478 return; 479 } 480 481 // When gauge metric wants to randomly sample the output atom, we just simply use the first 482 // gauge in the given bucket. 483 if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() && 484 mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 485 return; 486 } 487 if (hitGuardRailLocked(eventKey)) { 488 return; 489 } 490 if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) { 491 return; 492 } 493 GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs); 494 (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom); 495 // Anomaly detection on gauge metric only works when there is one numeric 496 // field specified. 497 if (mAnomalyTrackers.size() > 0) { 498 if (gaugeAtom.mFields->size() == 1) { 499 const Value& value = gaugeAtom.mFields->begin()->mValue; 500 long gaugeVal = 0; 501 if (value.getType() == INT) { 502 gaugeVal = (long)value.int_value; 503 } else if (value.getType() == LONG) { 504 gaugeVal = value.long_value; 505 } 506 for (auto& tracker : mAnomalyTrackers) { 507 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, 508 eventKey, gaugeVal); 509 } 510 } 511 } 512 } 513 514 void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() { 515 for (const auto& slice : *mCurrentSlicedBucket) { 516 if (slice.second.empty()) { 517 continue; 518 } 519 const Value& value = slice.second.front().mFields->front().mValue; 520 long gaugeVal = 0; 521 if (value.getType() == INT) { 522 gaugeVal = (long)value.int_value; 523 } else if (value.getType() == LONG) { 524 gaugeVal = value.long_value; 525 } 526 (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal; 527 } 528 } 529 530 void GaugeMetricProducer::dropDataLocked(const int64_t dropTimeNs) { 531 flushIfNeededLocked(dropTimeNs); 532 StatsdStats::getInstance().noteBucketDropped(mMetricId); 533 mPastBuckets.clear(); 534 } 535 536 // When a new matched event comes in, we check if event falls into the current 537 // bucket. If not, flush the old counter to past buckets and initialize the new 538 // bucket. 539 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 540 // the GaugeMetricProducer while holding the lock. 541 void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { 542 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 543 544 if (eventTimeNs < currentBucketEndTimeNs) { 545 VLOG("Gauge eventTime is %lld, less than next bucket start time %lld", 546 (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); 547 return; 548 } 549 550 // Adjusts the bucket start and end times. 551 int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; 552 int64_t nextBucketNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; 553 flushCurrentBucketLocked(eventTimeNs, nextBucketNs); 554 555 mCurrentBucketNum += numBucketsForward; 556 VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId, 557 (long long)mCurrentBucketStartTimeNs); 558 } 559 560 void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, 561 const int64_t& nextBucketStartTimeNs) { 562 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); 563 564 GaugeBucket info; 565 info.mBucketStartNs = mCurrentBucketStartTimeNs; 566 if (eventTimeNs < fullBucketEndTimeNs) { 567 info.mBucketEndNs = eventTimeNs; 568 } else { 569 info.mBucketEndNs = fullBucketEndTimeNs; 570 } 571 572 if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { 573 for (const auto& slice : *mCurrentSlicedBucket) { 574 info.mGaugeAtoms = slice.second; 575 auto& bucketList = mPastBuckets[slice.first]; 576 bucketList.push_back(info); 577 VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, 578 slice.first.toString().c_str()); 579 } 580 } else { 581 mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); 582 } 583 584 // If we have anomaly trackers, we need to update the partial bucket values. 585 if (mAnomalyTrackers.size() > 0) { 586 updateCurrentSlicedBucketForAnomaly(); 587 588 if (eventTimeNs > fullBucketEndTimeNs) { 589 // This is known to be a full bucket, so send this data to the anomaly tracker. 590 for (auto& tracker : mAnomalyTrackers) { 591 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum); 592 } 593 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 594 } 595 } 596 597 StatsdStats::getInstance().noteBucketCount(mMetricId); 598 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 599 mCurrentBucketStartTimeNs = nextBucketStartTimeNs; 600 } 601 602 size_t GaugeMetricProducer::byteSizeLocked() const { 603 size_t totalSize = 0; 604 for (const auto& pair : mPastBuckets) { 605 for (const auto& bucket : pair.second) { 606 totalSize += bucket.mGaugeAtoms.size() * sizeof(GaugeAtom); 607 for (const auto& atom : bucket.mGaugeAtoms) { 608 if (atom.mFields != nullptr) { 609 totalSize += atom.mFields->size() * sizeof(FieldValue); 610 } 611 } 612 } 613 } 614 return totalSize; 615 } 616 617 } // namespace statsd 618 } // namespace os 619 } // namespace android 620