Home | History | Annotate | Download | only in load_reporter
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
     20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
     21 
     22 #include <grpc/support/port_platform.h>
     23 
     24 #include <atomic>
     25 #include <chrono>
     26 #include <deque>
     27 #include <vector>
     28 
     29 #include <grpc/support/log.h>
     30 #include <grpcpp/impl/codegen/config.h>
     31 
     32 #include "src/cpp/server/load_reporter/load_data_store.h"
     33 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
     34 
     35 #include "opencensus/stats/stats.h"
     36 
     37 namespace grpc {
     38 namespace load_reporter {
     39 
     40 // The interface to get the Census stats. Abstracted for mocking.
     41 class CensusViewProvider {
     42  public:
     43   // Maps from the view name to the view data.
     44   using ViewDataMap =
     45       std::unordered_map<grpc::string, ::opencensus::stats::ViewData>;
     46   // Maps from the view name to the view descriptor.
     47   using ViewDescriptorMap =
     48       std::unordered_map<grpc::string, ::opencensus::stats::ViewDescriptor>;
     49 
     50   CensusViewProvider();
     51   virtual ~CensusViewProvider() = default;
     52 
     53   // Fetches the view data accumulated since last fetching, and returns it as a
     54   // map from the view name to the view data.
     55   virtual ViewDataMap FetchViewData() = 0;
     56 
     57   // A helper function that gets a row with the input tag values from the view
     58   // data. Only used when we know that row must exist because we have seen a row
     59   // with the same tag values in a related view data. Several ViewData's are
     60   // considered related if their views are based on the measures that are always
     61   // recorded at the same time.
     62   static double GetRelatedViewDataRowDouble(
     63       const ViewDataMap& view_data_map, const char* view_name,
     64       size_t view_name_len, const std::vector<grpc::string>& tag_values);
     65   static uint64_t GetRelatedViewDataRowInt(
     66       const ViewDataMap& view_data_map, const char* view_name,
     67       size_t view_name_len, const std::vector<grpc::string>& tag_values);
     68 
     69  protected:
     70   const ViewDescriptorMap& view_descriptor_map() const {
     71     return view_descriptor_map_;
     72   }
     73 
     74  private:
     75   ViewDescriptorMap view_descriptor_map_;
     76   // Tag keys.
     77   ::opencensus::stats::TagKey tag_key_token_;
     78   ::opencensus::stats::TagKey tag_key_host_;
     79   ::opencensus::stats::TagKey tag_key_user_id_;
     80   ::opencensus::stats::TagKey tag_key_status_;
     81   ::opencensus::stats::TagKey tag_key_metric_name_;
     82 };
     83 
     84 // The default implementation fetches the real stats from Census.
     85 class CensusViewProviderDefaultImpl : public CensusViewProvider {
     86  public:
     87   CensusViewProviderDefaultImpl();
     88 
     89   ViewDataMap FetchViewData() override;
     90 
     91  private:
     92   std::unordered_map<grpc::string, ::opencensus::stats::View> view_map_;
     93 };
     94 
     95 // The interface to get the CPU stats. Abstracted for mocking.
     96 class CpuStatsProvider {
     97  public:
     98   // The used and total amounts of CPU usage.
     99   using CpuStatsSample = std::pair<uint64_t, uint64_t>;
    100 
    101   virtual ~CpuStatsProvider() = default;
    102 
    103   // Gets the cumulative used CPU and total CPU resource.
    104   virtual CpuStatsSample GetCpuStats() = 0;
    105 };
    106 
    107 // The default implementation reads CPU jiffies from the system to calculate CPU
    108 // utilization.
    109 class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
    110  public:
    111   CpuStatsSample GetCpuStats() override;
    112 };
    113 
    114 // Maintains all the load data and load reporting streams.
    115 class LoadReporter {
    116  public:
    117   // TODO(juanlishen): Allow config for providers from users.
    118   LoadReporter(uint32_t feedback_sample_window_seconds,
    119                std::unique_ptr<CensusViewProvider> census_view_provider,
    120                std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
    121       : feedback_sample_window_seconds_(feedback_sample_window_seconds),
    122         census_view_provider_(std::move(census_view_provider)),
    123         cpu_stats_provider_(std::move(cpu_stats_provider)) {
    124     // Append the initial record so that the next real record can have a base.
    125     AppendNewFeedbackRecord(0, 0);
    126   }
    127 
    128   // Fetches the latest data from Census and merge it into the data store.
    129   // Also adds a new sample to the LB feedback sliding window.
    130   // Thread-unsafe. (1). The access to the load data store and feedback records
    131   // has locking. (2). The access to the Census view provider and CPU stats
    132   // provider lacks locking, but we only access these two members in this method
    133   // (in testing, we also access them when setting up expectation). So the
    134   // invocations of this method must be serialized.
    135   void FetchAndSample();
    136 
    137   // Generates a report for that host and balancer. The report contains
    138   // all the stats data accumulated between the last report (i.e., the last
    139   // consumption) and the last fetch from Census (i.e., the last production).
    140   // Thread-safe.
    141   ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
    142       const grpc::string& hostname, const grpc::string& lb_id);
    143 
    144   // The feedback is calculated from the stats data recorded in the sliding
    145   // window. Outdated records are discarded.
    146   // Thread-safe.
    147   ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
    148 
    149   // Wrapper around LoadDataStore::ReportStreamCreated.
    150   // Thread-safe.
    151   void ReportStreamCreated(const grpc::string& hostname,
    152                            const grpc::string& lb_id,
    153                            const grpc::string& load_key);
    154 
    155   // Wrapper around LoadDataStore::ReportStreamClosed.
    156   // Thread-safe.
    157   void ReportStreamClosed(const grpc::string& hostname,
    158                           const grpc::string& lb_id);
    159 
    160   // Generates a unique LB ID of length kLbIdLength. Returns an empty string
    161   // upon failure. Thread-safe.
    162   grpc::string GenerateLbId();
    163 
    164   // Accessors only for testing.
    165   CensusViewProvider* census_view_provider() {
    166     return census_view_provider_.get();
    167   }
    168   CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
    169 
    170  private:
    171   struct LoadBalancingFeedbackRecord {
    172     std::chrono::system_clock::time_point end_time;
    173     uint64_t rpcs;
    174     uint64_t errors;
    175     uint64_t cpu_usage;
    176     uint64_t cpu_limit;
    177 
    178     LoadBalancingFeedbackRecord(
    179         const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
    180         uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
    181         : end_time(end_time),
    182           rpcs(rpcs),
    183           errors(errors),
    184           cpu_usage(cpu_usage),
    185           cpu_limit(cpu_limit) {}
    186   };
    187 
    188   // Finds the view data about starting call from the view_data_map and merges
    189   // the data to the load data store.
    190   void ProcessViewDataCallStart(
    191       const CensusViewProvider::ViewDataMap& view_data_map);
    192   // Finds the view data about ending call from the view_data_map and merges the
    193   // data to the load data store.
    194   void ProcessViewDataCallEnd(
    195       const CensusViewProvider::ViewDataMap& view_data_map);
    196   // Finds the view data about the customized call metrics from the
    197   // view_data_map and merges the data to the load data store.
    198   void ProcessViewDataOtherCallMetrics(
    199       const CensusViewProvider::ViewDataMap& view_data_map);
    200 
    201   bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
    202                         std::chrono::system_clock::time_point now) {
    203     return record.end_time > now - feedback_sample_window_seconds_;
    204   }
    205 
    206   void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
    207 
    208   // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
    209   // it to the load.
    210   void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
    211                           const PerBalancerStore& per_balancer_store);
    212 
    213   std::atomic<int64_t> next_lb_id_{0};
    214   const std::chrono::seconds feedback_sample_window_seconds_;
    215   std::mutex feedback_mu_;
    216   std::deque<LoadBalancingFeedbackRecord> feedback_records_;
    217   // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
    218   // too expensive.
    219   std::mutex store_mu_;
    220   LoadDataStore load_data_store_;
    221   std::unique_ptr<CensusViewProvider> census_view_provider_;
    222   std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
    223 };
    224 
    225 }  // namespace load_reporter
    226 }  // namespace grpc
    227 
    228 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
    229