Home | History | Annotate | Download | only in load_reporter
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/impl/codegen/port_platform.h>
     20 
     21 #include <stdint.h>
     22 #include <stdio.h>
     23 #include <chrono>
     24 #include <ctime>
     25 #include <iterator>
     26 
     27 #include "src/cpp/server/load_reporter/constants.h"
     28 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
     29 #include "src/cpp/server/load_reporter/load_reporter.h"
     30 
     31 #include "opencensus/stats/internal/set_aggregation_window.h"
     32 
     33 namespace grpc {
     34 namespace load_reporter {
     35 
     36 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
     37   return GetCpuStatsImpl();
     38 }
     39 
     40 CensusViewProvider::CensusViewProvider()
     41     : tag_key_token_(::opencensus::stats::TagKey::Register(kTagKeyToken)),
     42       tag_key_host_(::opencensus::stats::TagKey::Register(kTagKeyHost)),
     43       tag_key_user_id_(::opencensus::stats::TagKey::Register(kTagKeyUserId)),
     44       tag_key_status_(::opencensus::stats::TagKey::Register(kTagKeyStatus)),
     45       tag_key_metric_name_(
     46           ::opencensus::stats::TagKey::Register(kTagKeyMetricName)) {
     47   // One view related to starting a call.
     48   auto vd_start_count =
     49       ::opencensus::stats::ViewDescriptor()
     50           .set_name(kViewStartCount)
     51           .set_measure(kMeasureStartCount)
     52           .set_aggregation(::opencensus::stats::Aggregation::Sum())
     53           .add_column(tag_key_token_)
     54           .add_column(tag_key_host_)
     55           .add_column(tag_key_user_id_)
     56           .set_description(
     57               "Delta count of calls started broken down by <token, host, "
     58               "user_id>.");
     59   ::opencensus::stats::SetAggregationWindow(
     60       ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
     61   view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
     62   // Four views related to ending a call.
     63   // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
     64   // measure), it's infeasible to prepare fake data for testing. That's because
     65   // the OpenCensus API to make up view data will add the input data as separate
     66   // measurements instead of setting the data values directly.
     67   auto vd_end_count =
     68       ::opencensus::stats::ViewDescriptor()
     69           .set_name(kViewEndCount)
     70           .set_measure(kMeasureEndCount)
     71           .set_aggregation(::opencensus::stats::Aggregation::Sum())
     72           .add_column(tag_key_token_)
     73           .add_column(tag_key_host_)
     74           .add_column(tag_key_user_id_)
     75           .add_column(tag_key_status_)
     76           .set_description(
     77               "Delta count of calls ended broken down by <token, host, "
     78               "user_id, status>.");
     79   ::opencensus::stats::SetAggregationWindow(
     80       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
     81   view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
     82   auto vd_end_bytes_sent =
     83       ::opencensus::stats::ViewDescriptor()
     84           .set_name(kViewEndBytesSent)
     85           .set_measure(kMeasureEndBytesSent)
     86           .set_aggregation(::opencensus::stats::Aggregation::Sum())
     87           .add_column(tag_key_token_)
     88           .add_column(tag_key_host_)
     89           .add_column(tag_key_user_id_)
     90           .add_column(tag_key_status_)
     91           .set_description(
     92               "Delta sum of bytes sent broken down by <token, host, user_id, "
     93               "status>.");
     94   ::opencensus::stats::SetAggregationWindow(
     95       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
     96   view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
     97   auto vd_end_bytes_received =
     98       ::opencensus::stats::ViewDescriptor()
     99           .set_name(kViewEndBytesReceived)
    100           .set_measure(kMeasureEndBytesReceived)
    101           .set_aggregation(::opencensus::stats::Aggregation::Sum())
    102           .add_column(tag_key_token_)
    103           .add_column(tag_key_host_)
    104           .add_column(tag_key_user_id_)
    105           .add_column(tag_key_status_)
    106           .set_description(
    107               "Delta sum of bytes received broken down by <token, host, "
    108               "user_id, status>.");
    109   ::opencensus::stats::SetAggregationWindow(
    110       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
    111   view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
    112   auto vd_end_latency_ms =
    113       ::opencensus::stats::ViewDescriptor()
    114           .set_name(kViewEndLatencyMs)
    115           .set_measure(kMeasureEndLatencyMs)
    116           .set_aggregation(::opencensus::stats::Aggregation::Sum())
    117           .add_column(tag_key_token_)
    118           .add_column(tag_key_host_)
    119           .add_column(tag_key_user_id_)
    120           .add_column(tag_key_status_)
    121           .set_description(
    122               "Delta sum of latency in ms broken down by <token, host, "
    123               "user_id, status>.");
    124   ::opencensus::stats::SetAggregationWindow(
    125       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
    126   view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
    127   // Two views related to other call metrics.
    128   auto vd_metric_call_count =
    129       ::opencensus::stats::ViewDescriptor()
    130           .set_name(kViewOtherCallMetricCount)
    131           .set_measure(kMeasureOtherCallMetric)
    132           .set_aggregation(::opencensus::stats::Aggregation::Count())
    133           .add_column(tag_key_token_)
    134           .add_column(tag_key_host_)
    135           .add_column(tag_key_user_id_)
    136           .add_column(tag_key_metric_name_)
    137           .set_description(
    138               "Delta count of calls broken down by <token, host, user_id, "
    139               "metric_name>.");
    140   ::opencensus::stats::SetAggregationWindow(
    141       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
    142   view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
    143   auto vd_metric_value =
    144       ::opencensus::stats::ViewDescriptor()
    145           .set_name(kViewOtherCallMetricValue)
    146           .set_measure(kMeasureOtherCallMetric)
    147           .set_aggregation(::opencensus::stats::Aggregation::Sum())
    148           .add_column(tag_key_token_)
    149           .add_column(tag_key_host_)
    150           .add_column(tag_key_user_id_)
    151           .add_column(tag_key_metric_name_)
    152           .set_description(
    153               "Delta sum of call metric value broken down "
    154               "by <token, host, user_id, metric_name>.");
    155   ::opencensus::stats::SetAggregationWindow(
    156       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
    157   view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
    158 }
    159 
    160 double CensusViewProvider::GetRelatedViewDataRowDouble(
    161     const ViewDataMap& view_data_map, const char* view_name,
    162     size_t view_name_len, const std::vector<grpc::string>& tag_values) {
    163   auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
    164   GPR_ASSERT(it_vd != view_data_map.end());
    165   GPR_ASSERT(it_vd->second.type() ==
    166              ::opencensus::stats::ViewData::Type::kDouble);
    167   auto it_row = it_vd->second.double_data().find(tag_values);
    168   GPR_ASSERT(it_row != it_vd->second.double_data().end());
    169   return it_row->second;
    170 }
    171 
    172 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
    173     const ViewDataMap& view_data_map, const char* view_name,
    174     size_t view_name_len, const std::vector<grpc::string>& tag_values) {
    175   auto it_vd = view_data_map.find(grpc::string(view_name, view_name_len));
    176   GPR_ASSERT(it_vd != view_data_map.end());
    177   GPR_ASSERT(it_vd->second.type() ==
    178              ::opencensus::stats::ViewData::Type::kInt64);
    179   auto it_row = it_vd->second.int_data().find(tag_values);
    180   GPR_ASSERT(it_row != it_vd->second.int_data().end());
    181   GPR_ASSERT(it_row->second >= 0);
    182   return it_row->second;
    183 }
    184 
    185 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
    186   for (const auto& p : view_descriptor_map()) {
    187     const grpc::string& view_name = p.first;
    188     const ::opencensus::stats::ViewDescriptor& vd = p.second;
    189     // We need to use pair's piecewise ctor here, otherwise the deleted copy
    190     // ctor of View will be called.
    191     view_map_.emplace(std::piecewise_construct,
    192                       std::forward_as_tuple(view_name),
    193                       std::forward_as_tuple(vd));
    194   }
    195 }
    196 
    197 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
    198   gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
    199   ViewDataMap view_data_map;
    200   for (auto& p : view_map_) {
    201     const grpc::string& view_name = p.first;
    202     ::opencensus::stats::View& view = p.second;
    203     if (view.IsValid()) {
    204       view_data_map.emplace(view_name, view.GetData());
    205       gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
    206               view_name.c_str());
    207     } else {
    208       gpr_log(
    209           GPR_DEBUG,
    210           "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
    211           this, view_name.c_str());
    212     }
    213   }
    214   return view_data_map;
    215 }
    216 
    217 grpc::string LoadReporter::GenerateLbId() {
    218   while (true) {
    219     if (next_lb_id_ > UINT32_MAX) {
    220       gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
    221               this);
    222       return "";
    223     }
    224     int64_t lb_id = next_lb_id_++;
    225     // Overflow should never happen.
    226     GPR_ASSERT(lb_id >= 0);
    227     // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
    228     char buf[kLbIdLength + 1];
    229     snprintf(buf, sizeof(buf), "%08lx", lb_id);
    230     grpc::string lb_id_str(buf, kLbIdLength);
    231     // The client may send requests with LB ID that has never been allocated
    232     // by this load reporter. Those IDs are tracked and will be skipped when
    233     // we generate a new ID.
    234     if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
    235       return lb_id_str;
    236     }
    237   }
    238 }
    239 
    240 ::grpc::lb::v1::LoadBalancingFeedback
    241 LoadReporter::GenerateLoadBalancingFeedback() {
    242   std::unique_lock<std::mutex> lock(feedback_mu_);
    243   auto now = std::chrono::system_clock::now();
    244   // Discard records outside the window until there is only one record
    245   // outside the window, which is used as the base for difference.
    246   while (feedback_records_.size() > 1 &&
    247          !IsRecordInWindow(feedback_records_[1], now)) {
    248     feedback_records_.pop_front();
    249   }
    250   if (feedback_records_.size() < 2) {
    251     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
    252   }
    253   // Find the longest range with valid ends.
    254   auto oldest = feedback_records_.begin();
    255   auto newest = feedback_records_.end() - 1;
    256   while (std::distance(oldest, newest) > 0 &&
    257          (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
    258     // A zero limit means that the system info reading was failed, so these
    259     // records can't be used to calculate CPU utilization.
    260     if (newest->cpu_limit == 0) --newest;
    261     if (oldest->cpu_limit == 0) ++oldest;
    262   }
    263   if (std::distance(oldest, newest) < 1 ||
    264       oldest->end_time == newest->end_time ||
    265       newest->cpu_limit == oldest->cpu_limit) {
    266     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
    267   }
    268   uint64_t rpcs = 0;
    269   uint64_t errors = 0;
    270   for (auto p = newest; p != oldest; --p) {
    271     // Because these two numbers are counters, the oldest record shouldn't be
    272     // included.
    273     rpcs += p->rpcs;
    274     errors += p->errors;
    275   }
    276   double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
    277   double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
    278   std::chrono::duration<double> duration_seconds =
    279       newest->end_time - oldest->end_time;
    280   lock.unlock();
    281   ::grpc::lb::v1::LoadBalancingFeedback feedback;
    282   feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
    283   feedback.set_calls_per_second(
    284       static_cast<float>(rpcs / duration_seconds.count()));
    285   feedback.set_errors_per_second(
    286       static_cast<float>(errors / duration_seconds.count()));
    287   return feedback;
    288 }
    289 
    290 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
    291 LoadReporter::GenerateLoads(const grpc::string& hostname,
    292                             const grpc::string& lb_id) {
    293   std::lock_guard<std::mutex> lock(store_mu_);
    294   auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
    295   GPR_ASSERT(assigned_stores != nullptr);
    296   GPR_ASSERT(!assigned_stores->empty());
    297   ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
    298   for (PerBalancerStore* per_balancer_store : *assigned_stores) {
    299     GPR_ASSERT(!per_balancer_store->IsSuspended());
    300     if (!per_balancer_store->load_record_map().empty()) {
    301       for (const auto& p : per_balancer_store->load_record_map()) {
    302         const auto& key = p.first;
    303         const auto& value = p.second;
    304         auto load = loads.Add();
    305         load->set_load_balance_tag(key.lb_tag());
    306         load->set_user_id(key.user_id());
    307         load->set_client_ip_address(key.GetClientIpBytes());
    308         load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
    309         load->set_num_calls_finished_without_error(
    310             static_cast<int64_t>(value.ok_count()));
    311         load->set_num_calls_finished_with_error(
    312             static_cast<int64_t>(value.error_count()));
    313         load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
    314         load->set_total_bytes_received(
    315             static_cast<int64_t>(value.bytes_recv()));
    316         load->mutable_total_latency()->set_seconds(
    317             static_cast<int64_t>(value.latency_ms() / 1000));
    318         load->mutable_total_latency()->set_nanos(
    319             (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
    320         for (const auto& p : value.call_metrics()) {
    321           const grpc::string& metric_name = p.first;
    322           const CallMetricValue& metric_value = p.second;
    323           auto call_metric_data = load->add_metric_data();
    324           call_metric_data->set_metric_name(metric_name);
    325           call_metric_data->set_num_calls_finished_with_metric(
    326               metric_value.num_calls());
    327           call_metric_data->set_total_metric_value(
    328               metric_value.total_metric_value());
    329         }
    330         if (per_balancer_store->lb_id() != lb_id) {
    331           // This per-balancer store is an orphan assigned to this receiving
    332           // balancer.
    333           AttachOrphanLoadId(load, *per_balancer_store);
    334         }
    335       }
    336       per_balancer_store->ClearLoadRecordMap();
    337     }
    338     if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
    339       auto load = loads.Add();
    340       load->set_num_calls_in_progress(
    341           per_balancer_store->GetNumCallsInProgressForReport());
    342       if (per_balancer_store->lb_id() != lb_id) {
    343         // This per-balancer store is an orphan assigned to this receiving
    344         // balancer.
    345         AttachOrphanLoadId(load, *per_balancer_store);
    346       }
    347     }
    348   }
    349   return loads;
    350 }
    351 
    352 void LoadReporter::AttachOrphanLoadId(
    353     ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
    354   if (per_balancer_store.lb_id() == kInvalidLbId) {
    355     load->set_load_key_unknown(true);
    356   } else {
    357     // We shouldn't set load_key_unknown to any value in this case because
    358     // load_key_unknown and orphaned_load_identifier are under an oneof struct.
    359     load->mutable_orphaned_load_identifier()->set_load_key(
    360         per_balancer_store.load_key());
    361     load->mutable_orphaned_load_identifier()->set_load_balancer_id(
    362         per_balancer_store.lb_id());
    363   }
    364 }
    365 
    366 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
    367   CpuStatsProvider::CpuStatsSample cpu_stats;
    368   if (cpu_stats_provider_ != nullptr) {
    369     cpu_stats = cpu_stats_provider_->GetCpuStats();
    370   } else {
    371     // This will make the load balancing feedback generation a no-op.
    372     cpu_stats = {0, 0};
    373   }
    374   std::unique_lock<std::mutex> lock(feedback_mu_);
    375   feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
    376                                  cpu_stats.first, cpu_stats.second);
    377 }
    378 
    379 void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
    380                                        const grpc::string& lb_id,
    381                                        const grpc::string& load_key) {
    382   std::lock_guard<std::mutex> lock(store_mu_);
    383   load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
    384   gpr_log(GPR_INFO,
    385           "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
    386           this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
    387 }
    388 
    389 void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
    390                                       const grpc::string& lb_id) {
    391   std::lock_guard<std::mutex> lock(store_mu_);
    392   load_data_store_.ReportStreamClosed(hostname, lb_id);
    393   gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
    394           hostname.c_str(), lb_id.c_str());
    395 }
    396 
    397 void LoadReporter::ProcessViewDataCallStart(
    398     const CensusViewProvider::ViewDataMap& view_data_map) {
    399   auto it = view_data_map.find(kViewStartCount);
    400   if (it != view_data_map.end()) {
    401     for (const auto& p : it->second.int_data()) {
    402       const std::vector<grpc::string>& tag_values = p.first;
    403       const uint64_t start_count = static_cast<uint64_t>(p.second);
    404       const grpc::string& client_ip_and_token = tag_values[0];
    405       const grpc::string& host = tag_values[1];
    406       const grpc::string& user_id = tag_values[2];
    407       LoadRecordKey key(client_ip_and_token, user_id);
    408       LoadRecordValue value = LoadRecordValue(start_count);
    409       {
    410         std::unique_lock<std::mutex> lock(store_mu_);
    411         load_data_store_.MergeRow(host, key, value);
    412       }
    413     }
    414   }
    415 }
    416 
    417 void LoadReporter::ProcessViewDataCallEnd(
    418     const CensusViewProvider::ViewDataMap& view_data_map) {
    419   uint64_t total_end_count = 0;
    420   uint64_t total_error_count = 0;
    421   auto it = view_data_map.find(kViewEndCount);
    422   if (it != view_data_map.end()) {
    423     for (const auto& p : it->second.int_data()) {
    424       const std::vector<grpc::string>& tag_values = p.first;
    425       const uint64_t end_count = static_cast<uint64_t>(p.second);
    426       const grpc::string& client_ip_and_token = tag_values[0];
    427       const grpc::string& host = tag_values[1];
    428       const grpc::string& user_id = tag_values[2];
    429       const grpc::string& status = tag_values[3];
    430       // This is due to a bug reported internally of Java server load reporting
    431       // implementation.
    432       // TODO(juanlishen): Check whether this situation happens in OSS C++.
    433       if (client_ip_and_token.size() == 0) {
    434         gpr_log(GPR_DEBUG,
    435                 "Skipping processing Opencensus record with empty "
    436                 "client_ip_and_token tag.");
    437         continue;
    438       }
    439       LoadRecordKey key(client_ip_and_token, user_id);
    440       const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
    441           view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
    442           tag_values);
    443       const uint64_t bytes_received =
    444           CensusViewProvider::GetRelatedViewDataRowInt(
    445               view_data_map, kViewEndBytesReceived,
    446               sizeof(kViewEndBytesReceived) - 1, tag_values);
    447       const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
    448           view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
    449           tag_values);
    450       uint64_t ok_count = 0;
    451       uint64_t error_count = 0;
    452       total_end_count += end_count;
    453       if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
    454         ok_count = end_count;
    455       } else {
    456         error_count = end_count;
    457         total_error_count += end_count;
    458       }
    459       LoadRecordValue value = LoadRecordValue(
    460           0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
    461       {
    462         std::unique_lock<std::mutex> lock(store_mu_);
    463         load_data_store_.MergeRow(host, key, value);
    464       }
    465     }
    466   }
    467   AppendNewFeedbackRecord(total_end_count, total_error_count);
    468 }
    469 
    470 void LoadReporter::ProcessViewDataOtherCallMetrics(
    471     const CensusViewProvider::ViewDataMap& view_data_map) {
    472   auto it = view_data_map.find(kViewOtherCallMetricCount);
    473   if (it != view_data_map.end()) {
    474     for (const auto& p : it->second.int_data()) {
    475       const std::vector<grpc::string>& tag_values = p.first;
    476       const int64_t num_calls = p.second;
    477       const grpc::string& client_ip_and_token = tag_values[0];
    478       const grpc::string& host = tag_values[1];
    479       const grpc::string& user_id = tag_values[2];
    480       const grpc::string& metric_name = tag_values[3];
    481       LoadRecordKey key(client_ip_and_token, user_id);
    482       const double total_metric_value =
    483           CensusViewProvider::GetRelatedViewDataRowDouble(
    484               view_data_map, kViewOtherCallMetricValue,
    485               sizeof(kViewOtherCallMetricValue) - 1, tag_values);
    486       LoadRecordValue value = LoadRecordValue(
    487           metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
    488       {
    489         std::unique_lock<std::mutex> lock(store_mu_);
    490         load_data_store_.MergeRow(host, key, value);
    491       }
    492     }
    493   }
    494 }
    495 
    496 void LoadReporter::FetchAndSample() {
    497   gpr_log(GPR_DEBUG,
    498           "[LR %p] Starts fetching Census view data and sampling LB feedback "
    499           "record.",
    500           this);
    501   CensusViewProvider::ViewDataMap view_data_map =
    502       census_view_provider_->FetchViewData();
    503   ProcessViewDataCallStart(view_data_map);
    504   ProcessViewDataCallEnd(view_data_map);
    505   ProcessViewDataOtherCallMetrics(view_data_map);
    506 }
    507 
    508 }  // namespace load_reporter
    509 }  // namespace grpc
    510