Home | History | Annotate | Download | only in metrics
      1 /*
      2 * Copyright (C) 2017 The Android Open Source Project
      3 *
      4 * Licensed under the Apache License, Version 2.0 (the "License");
      5 * you may not use this file except in compliance with the License.
      6 * You may obtain a copy of the License at
      7 *
      8 *      http://www.apache.org/licenses/LICENSE-2.0
      9 *
     10 * Unless required by applicable law or agreed to in writing, software
     11 * distributed under the License is distributed on an "AS IS" BASIS,
     12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 * See the License for the specific language governing permissions and
     14 * limitations under the License.
     15 */
     16 
     17 #define DEBUG false  // STOPSHIP if true
     18 #include "Log.h"
     19 
     20 #include "../guardrail/StatsdStats.h"
     21 #include "GaugeMetricProducer.h"
     22 #include "../stats_log_util.h"
     23 
     24 #include <cutils/log.h>
     25 
     26 using android::util::FIELD_COUNT_REPEATED;
     27 using android::util::FIELD_TYPE_BOOL;
     28 using android::util::FIELD_TYPE_FLOAT;
     29 using android::util::FIELD_TYPE_INT32;
     30 using android::util::FIELD_TYPE_INT64;
     31 using android::util::FIELD_TYPE_MESSAGE;
     32 using android::util::FIELD_TYPE_STRING;
     33 using android::util::ProtoOutputStream;
     34 using std::map;
     35 using std::string;
     36 using std::unordered_map;
     37 using std::vector;
     38 using std::make_shared;
     39 using std::shared_ptr;
     40 
     41 namespace android {
     42 namespace os {
     43 namespace statsd {
     44 
     45 // for StatsLogReport
     46 const int FIELD_ID_ID = 1;
     47 const int FIELD_ID_GAUGE_METRICS = 8;
     48 const int FIELD_ID_TIME_BASE = 9;
     49 const int FIELD_ID_BUCKET_SIZE = 10;
     50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
     51 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
     52 // for GaugeMetricDataWrapper
     53 const int FIELD_ID_DATA = 1;
     54 const int FIELD_ID_SKIPPED = 2;
     55 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
     56 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
     57 // for GaugeMetricData
     58 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
     59 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
     60 const int FIELD_ID_BUCKET_INFO = 3;
     61 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
     62 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
     63 // for GaugeBucketInfo
     64 const int FIELD_ID_ATOM = 3;
     65 const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4;
     66 const int FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP = 5;
     67 const int FIELD_ID_BUCKET_NUM = 6;
     68 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
     69 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
     70 
     71 GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
     72                                          const int conditionIndex,
     73                                          const sp<ConditionWizard>& wizard, const int pullTagId,
     74                                          const int64_t timeBaseNs, const int64_t startTimeNs,
     75                                          shared_ptr<StatsPullerManager> statsPullerManager)
     76     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
     77       mStatsPullerManager(statsPullerManager),
     78       mPullTagId(pullTagId),
     79       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
     80       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
     81                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
     82                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
     83                                   : StatsdStats::kDimensionKeySizeSoftLimit),
     84       mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
     85                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
     86                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
     87                                   : StatsdStats::kDimensionKeySizeHardLimit),
     88       mGaugeAtomsPerDimensionLimit(metric.max_num_gauge_atoms_per_bucket()) {
     89     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
     90     mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
     91     int64_t bucketSizeMills = 0;
     92     if (metric.has_bucket()) {
     93         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
     94     } else {
     95         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
     96     }
     97     mBucketSizeNs = bucketSizeMills * 1000000;
     98 
     99     mSamplingType = metric.sampling_type();
    100     if (!metric.gauge_fields_filter().include_all()) {
    101         translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
    102     }
    103 
    104     // TODO: use UidMap if uid->pkg_name is required
    105     if (metric.has_dimensions_in_what()) {
    106         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
    107         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
    108     }
    109 
    110     if (metric.has_dimensions_in_condition()) {
    111         translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
    112     }
    113 
    114     if (metric.links().size() > 0) {
    115         for (const auto& link : metric.links()) {
    116             Metric2Condition mc;
    117             mc.conditionId = link.condition();
    118             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
    119             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
    120             mMetric2ConditionLinks.push_back(mc);
    121         }
    122     }
    123     mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
    124     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
    125             HasPositionALL(metric.dimensions_in_condition());
    126 
    127     flushIfNeededLocked(startTimeNs);
    128     // Kicks off the puller immediately.
    129     if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
    130         mStatsPullerManager->RegisterReceiver(
    131                 mPullTagId, this, getCurrentBucketEndTimeNs(), mBucketSizeNs);
    132     }
    133 
    134     VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
    135          (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs,
    136          mConditionSliced);
    137 }
    138 
    139 // for testing
    140 GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
    141                                          const int conditionIndex,
    142                                          const sp<ConditionWizard>& wizard, const int pullTagId,
    143                                          const int64_t timeBaseNs, const int64_t startTimeNs)
    144     : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, timeBaseNs, startTimeNs,
    145                           make_shared<StatsPullerManager>()) {
    146 }
    147 
    148 GaugeMetricProducer::~GaugeMetricProducer() {
    149     VLOG("~GaugeMetricProducer() called");
    150     if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
    151         mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
    152     }
    153 }
    154 
    155 void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
    156     if (mCurrentSlicedBucket == nullptr ||
    157         mCurrentSlicedBucket->size() == 0) {
    158         return;
    159     }
    160 
    161     fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
    162             (unsigned long)mCurrentSlicedBucket->size());
    163     if (verbose) {
    164         for (const auto& it : *mCurrentSlicedBucket) {
    165             fprintf(out, "\t(what)%s\t(condition)%s  %d atoms\n",
    166                 it.first.getDimensionKeyInWhat().toString().c_str(),
    167                 it.first.getDimensionKeyInCondition().toString().c_str(),
    168                 (int)it.second.size());
    169         }
    170     }
    171 }
    172 
    173 void GaugeMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
    174     flushIfNeededLocked(dumpTimeNs);
    175     mPastBuckets.clear();
    176     mSkippedBuckets.clear();
    177 }
    178 
    179 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
    180                                              const bool include_current_partial_bucket,
    181                                              std::set<string> *str_set,
    182                                              ProtoOutputStream* protoOutput) {
    183     VLOG("Gauge metric %lld report now...", (long long)mMetricId);
    184     if (include_current_partial_bucket) {
    185         flushLocked(dumpTimeNs);
    186     } else {
    187         flushIfNeededLocked(dumpTimeNs);
    188     }
    189 
    190     if (mPastBuckets.empty()) {
    191         return;
    192     }
    193 
    194     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
    195     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
    196     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
    197 
    198     // Fills the dimension path if not slicing by ALL.
    199     if (!mSliceByPositionALL) {
    200         if (!mDimensionsInWhat.empty()) {
    201             uint64_t dimenPathToken = protoOutput->start(
    202                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
    203             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
    204             protoOutput->end(dimenPathToken);
    205         }
    206         if (!mDimensionsInCondition.empty()) {
    207             uint64_t dimenPathToken = protoOutput->start(
    208                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
    209             writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
    210             protoOutput->end(dimenPathToken);
    211         }
    212     }
    213 
    214     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
    215 
    216     for (const auto& pair : mSkippedBuckets) {
    217         uint64_t wrapperToken =
    218                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
    219         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
    220                            (long long)(NanoToMillis(pair.first)));
    221         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
    222                            (long long)(NanoToMillis(pair.second)));
    223         protoOutput->end(wrapperToken);
    224     }
    225     mSkippedBuckets.clear();
    226 
    227     for (const auto& pair : mPastBuckets) {
    228         const MetricDimensionKey& dimensionKey = pair.first;
    229 
    230         VLOG("Gauge dimension key %s", dimensionKey.toString().c_str());
    231         uint64_t wrapperToken =
    232                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
    233 
    234         // First fill dimension.
    235         if (mSliceByPositionALL) {
    236             uint64_t dimensionToken = protoOutput->start(
    237                     FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
    238             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
    239             protoOutput->end(dimensionToken);
    240 
    241             if (dimensionKey.hasDimensionKeyInCondition()) {
    242                 uint64_t dimensionInConditionToken = protoOutput->start(
    243                         FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
    244                 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(),
    245                                       str_set, protoOutput);
    246                 protoOutput->end(dimensionInConditionToken);
    247             }
    248         } else {
    249             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
    250                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
    251             if (dimensionKey.hasDimensionKeyInCondition()) {
    252                 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
    253                                                FIELD_ID_DIMENSION_LEAF_IN_CONDITION,
    254                                                str_set, protoOutput);
    255             }
    256         }
    257 
    258         // Then fill bucket_info (GaugeBucketInfo).
    259         for (const auto& bucket : pair.second) {
    260             uint64_t bucketInfoToken = protoOutput->start(
    261                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
    262 
    263             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
    264                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
    265                                    (long long)NanoToMillis(bucket.mBucketStartNs));
    266                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
    267                                    (long long)NanoToMillis(bucket.mBucketEndNs));
    268             } else {
    269                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
    270                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
    271             }
    272 
    273             if (!bucket.mGaugeAtoms.empty()) {
    274                 for (const auto& atom : bucket.mGaugeAtoms) {
    275                     uint64_t atomsToken =
    276                         protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
    277                                            FIELD_ID_ATOM);
    278                     writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput);
    279                     protoOutput->end(atomsToken);
    280                 }
    281                 const bool truncateTimestamp =
    282                         android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.find(
    283                                 mTagId) ==
    284                         android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.end();
    285                 for (const auto& atom : bucket.mGaugeAtoms) {
    286                     const int64_t elapsedTimestampNs =  truncateTimestamp ?
    287                         truncateTimestampNsToFiveMinutes(atom.mElapsedTimestamps) :
    288                             atom.mElapsedTimestamps;
    289                     const int64_t wallClockNs = truncateTimestamp ?
    290                         truncateTimestampNsToFiveMinutes(atom.mWallClockTimestampNs) :
    291                             atom.mWallClockTimestampNs;
    292                     protoOutput->write(
    293                         FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
    294                         (long long)elapsedTimestampNs);
    295                     protoOutput->write(
    296                         FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED |
    297                             FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP,
    298                         (long long)wallClockNs);
    299                 }
    300             }
    301             protoOutput->end(bucketInfoToken);
    302             VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.",
    303                  (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs,
    304                  (int)bucket.mGaugeAtoms.size());
    305         }
    306         protoOutput->end(wrapperToken);
    307     }
    308     protoOutput->end(protoToken);
    309 
    310     mPastBuckets.clear();
    311     // TODO: Clear mDimensionKeyMap once the report is dumped.
    312 }
    313 
    314 void GaugeMetricProducer::pullLocked(const int64_t timestampNs) {
    315     bool triggerPuller = false;
    316     switch(mSamplingType) {
    317         // When the metric wants to do random sampling and there is already one gauge atom for the
    318         // current bucket, do not do it again.
    319         case GaugeMetric::RANDOM_ONE_SAMPLE: {
    320             triggerPuller = mCondition && mCurrentSlicedBucket->empty();
    321             break;
    322         }
    323         case GaugeMetric::ALL_CONDITION_CHANGES: {
    324             triggerPuller = true;
    325             break;
    326         }
    327         case GaugeMetric::CONDITION_CHANGE_TO_TRUE: {
    328             triggerPuller = mCondition;
    329             break;
    330         }
    331         default:
    332             break;
    333     }
    334     if (!triggerPuller) {
    335         return;
    336     }
    337 
    338     vector<std::shared_ptr<LogEvent>> allData;
    339     if (!mStatsPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
    340         ALOGE("Gauge Stats puller failed for tag: %d", mPullTagId);
    341         return;
    342     }
    343 
    344     for (const auto& data : allData) {
    345         onMatchedLogEventLocked(0, *data);
    346     }
    347 }
    348 
    349 void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
    350                                                    const int64_t eventTimeNs) {
    351     VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId);
    352     flushIfNeededLocked(eventTimeNs);
    353     mCondition = conditionMet;
    354 
    355     if (mPullTagId != -1) {
    356         pullLocked(eventTimeNs);
    357     }  // else: Push mode. No need to proactively pull the gauge data.
    358 }
    359 
    360 void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
    361                                                            const int64_t eventTimeNs) {
    362     VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId,
    363          overallCondition);
    364     flushIfNeededLocked(eventTimeNs);
    365     // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will
    366     // pull for every dimension.
    367     mCondition = overallCondition;
    368     if (mPullTagId != -1) {
    369         pullLocked(eventTimeNs);
    370     }  // else: Push mode. No need to proactively pull the gauge data.
    371 }
    372 
    373 std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
    374     if (mFieldMatchers.size() > 0) {
    375         std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>();
    376         filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
    377         return gaugeFields;
    378     } else {
    379         return std::make_shared<vector<FieldValue>>(event.getValues());
    380     }
    381 }
    382 
    383 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
    384     std::lock_guard<std::mutex> lock(mMutex);
    385     if (allData.size() == 0) {
    386         return;
    387     }
    388     for (const auto& data : allData) {
    389         onMatchedLogEventLocked(0, *data);
    390     }
    391 }
    392 
    393 bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
    394     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
    395         return false;
    396     }
    397     // 1. Report the tuple count if the tuple count > soft limit
    398     if (mCurrentSlicedBucket->size() > mDimensionSoftLimit - 1) {
    399         size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
    400         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
    401         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
    402         if (newTupleCount > mDimensionHardLimit) {
    403             ALOGE("GaugeMetric %lld dropping data for dimension key %s",
    404                 (long long)mMetricId, newKey.toString().c_str());
    405             return true;
    406         }
    407     }
    408 
    409     return false;
    410 }
    411 
    412 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
    413         const size_t matcherIndex, const MetricDimensionKey& eventKey,
    414         const ConditionKey& conditionKey, bool condition,
    415         const LogEvent& event) {
    416     if (condition == false) {
    417         return;
    418     }
    419     int64_t eventTimeNs = event.GetElapsedTimestampNs();
    420     mTagId = event.GetTagId();
    421     if (eventTimeNs < mCurrentBucketStartTimeNs) {
    422         VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
    423              (long long)mCurrentBucketStartTimeNs);
    424         return;
    425     }
    426     flushIfNeededLocked(eventTimeNs);
    427 
    428     // When gauge metric wants to randomly sample the output atom, we just simply use the first
    429     // gauge in the given bucket.
    430     if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
    431         mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
    432         return;
    433     }
    434     if (hitGuardRailLocked(eventKey)) {
    435         return;
    436     }
    437     if ((*mCurrentSlicedBucket)[eventKey].size() >= mGaugeAtomsPerDimensionLimit) {
    438         return;
    439     }
    440     GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs, getWallClockNs());
    441     (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
    442     // Anomaly detection on gauge metric only works when there is one numeric
    443     // field specified.
    444     if (mAnomalyTrackers.size() > 0) {
    445         if (gaugeAtom.mFields->size() == 1) {
    446             const Value& value = gaugeAtom.mFields->begin()->mValue;
    447             long gaugeVal = 0;
    448             if (value.getType() == INT) {
    449                 gaugeVal = (long)value.int_value;
    450             } else if (value.getType() == LONG) {
    451                 gaugeVal = value.long_value;
    452             }
    453             for (auto& tracker : mAnomalyTrackers) {
    454                 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
    455                                                  gaugeVal);
    456             }
    457         }
    458     }
    459 }
    460 
    461 void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
    462     for (const auto& slice : *mCurrentSlicedBucket) {
    463         if (slice.second.empty()) {
    464             continue;
    465         }
    466         const Value& value = slice.second.front().mFields->front().mValue;
    467         long gaugeVal = 0;
    468         if (value.getType() == INT) {
    469             gaugeVal = (long)value.int_value;
    470         } else if (value.getType() == LONG) {
    471             gaugeVal = value.long_value;
    472         }
    473         (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
    474     }
    475 }
    476 
    477 void GaugeMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
    478     flushIfNeededLocked(dropTimeNs);
    479     mPastBuckets.clear();
    480 }
    481 
    482 // When a new matched event comes in, we check if event falls into the current
    483 // bucket. If not, flush the old counter to past buckets and initialize the new
    484 // bucket.
    485 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
    486 // the GaugeMetricProducer while holding the lock.
    487 void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
    488     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
    489 
    490     if (eventTimeNs < currentBucketEndTimeNs) {
    491         VLOG("Gauge eventTime is %lld, less than next bucket start time %lld",
    492              (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
    493         return;
    494     }
    495 
    496     flushCurrentBucketLocked(eventTimeNs);
    497 
    498     // Adjusts the bucket start and end times.
    499     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
    500     mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
    501     mCurrentBucketNum += numBucketsForward;
    502     VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId,
    503          (long long)mCurrentBucketStartTimeNs);
    504 }
    505 
    506 void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
    507     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
    508 
    509     GaugeBucket info;
    510     info.mBucketStartNs = mCurrentBucketStartTimeNs;
    511     if (eventTimeNs < fullBucketEndTimeNs) {
    512         info.mBucketEndNs = eventTimeNs;
    513     } else {
    514         info.mBucketEndNs = fullBucketEndTimeNs;
    515     }
    516 
    517     if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
    518         for (const auto& slice : *mCurrentSlicedBucket) {
    519             info.mGaugeAtoms = slice.second;
    520             auto& bucketList = mPastBuckets[slice.first];
    521             bucketList.push_back(info);
    522             VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
    523                  slice.first.toString().c_str());
    524         }
    525     } else {
    526         mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
    527     }
    528 
    529     // If we have anomaly trackers, we need to update the partial bucket values.
    530     if (mAnomalyTrackers.size() > 0) {
    531         updateCurrentSlicedBucketForAnomaly();
    532 
    533         if (eventTimeNs > fullBucketEndTimeNs) {
    534             // This is known to be a full bucket, so send this data to the anomaly tracker.
    535             for (auto& tracker : mAnomalyTrackers) {
    536                 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
    537             }
    538             mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
    539         }
    540     }
    541 
    542     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
    543 }
    544 
    545 size_t GaugeMetricProducer::byteSizeLocked() const {
    546     size_t totalSize = 0;
    547     for (const auto& pair : mPastBuckets) {
    548         for (const auto& bucket : pair.second) {
    549             totalSize += bucket.mGaugeAtoms.size() * sizeof(GaugeAtom);
    550             for (const auto& atom : bucket.mGaugeAtoms) {
    551                 if (atom.mFields != nullptr) {
    552                     totalSize += atom.mFields->size() * sizeof(FieldValue);
    553                 }
    554             }
    555         }
    556     }
    557     return totalSize;
    558 }
    559 
    560 }  // namespace statsd
    561 }  // namespace os
    562 }  // namespace android
    563