Home | History | Annotate | Download | only in load_reporter
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
     20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
     21 
     22 #include <grpc/support/port_platform.h>
     23 
     24 #include <memory>
     25 #include <set>
     26 #include <unordered_map>
     27 
     28 #include <grpc/support/log.h>
     29 #include <grpcpp/impl/codegen/config.h>
     30 
     31 #include "src/cpp/server/load_reporter/constants.h"
     32 
     33 namespace grpc {
     34 namespace load_reporter {
     35 
     36 // The load data storage is organized in hierarchy. The LoadDataStore is the
     37 // top-level data store. In LoadDataStore, for each host we keep a
     38 // PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
     39 // PerBalancerStore maintains a map of load records, mapping from LoadRecordKey
     40 // to LoadRecordValue. The LoadRecordValue contains a map of customized call
     41 // metrics, mapping from a call metric name to the CallMetricValue.
     42 
     43 // The value of a customized call metric.
     44 class CallMetricValue {
     45  public:
     46   explicit CallMetricValue(uint64_t num_calls = 0,
     47                            double total_metric_value = 0)
     48       : num_calls_(num_calls), total_metric_value_(total_metric_value) {}
     49 
     50   void MergeFrom(CallMetricValue other) {
     51     num_calls_ += other.num_calls_;
     52     total_metric_value_ += other.total_metric_value_;
     53   }
     54 
     55   // Getters.
     56   uint64_t num_calls() const { return num_calls_; }
     57   double total_metric_value() const { return total_metric_value_; }
     58 
     59  private:
     60   // The number of calls that finished with this metric.
     61   uint64_t num_calls_ = 0;
     62   // The sum of metric values across all the calls that finished with this
     63   // metric.
     64   double total_metric_value_ = 0;
     65 };
     66 
     67 // The key of a load record.
     68 class LoadRecordKey {
     69  public:
     70   LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, grpc::string user_id,
     71                 grpc::string client_ip_hex)
     72       : lb_id_(std::move(lb_id)),
     73         lb_tag_(std::move(lb_tag)),
     74         user_id_(std::move(user_id)),
     75         client_ip_hex_(std::move(client_ip_hex)) {}
     76 
     77   // Parses the input client_ip_and_token to set client IP, LB ID, and LB tag.
     78   LoadRecordKey(const grpc::string& client_ip_and_token, grpc::string user_id);
     79 
     80   grpc::string ToString() const {
     81     return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ +
     82            ", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ +
     83            "]";
     84   }
     85 
     86   bool operator==(const LoadRecordKey& other) const {
     87     return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ &&
     88            user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_;
     89   }
     90 
     91   // Gets the client IP bytes in network order (i.e., big-endian).
     92   grpc::string GetClientIpBytes() const;
     93 
     94   // Getters.
     95   const grpc::string& lb_id() const { return lb_id_; }
     96   const grpc::string& lb_tag() const { return lb_tag_; }
     97   const grpc::string& user_id() const { return user_id_; }
     98   const grpc::string& client_ip_hex() const { return client_ip_hex_; }
     99 
    100   struct Hasher {
    101     void hash_combine(size_t* seed, const grpc::string& k) const {
    102       *seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) +
    103                (*seed >> 2);
    104     }
    105 
    106     size_t operator()(const LoadRecordKey& k) const {
    107       size_t h = 0;
    108       hash_combine(&h, k.lb_id_);
    109       hash_combine(&h, k.lb_tag_);
    110       hash_combine(&h, k.user_id_);
    111       hash_combine(&h, k.client_ip_hex_);
    112       return h;
    113     }
    114   };
    115 
    116  private:
    117   grpc::string lb_id_;
    118   grpc::string lb_tag_;
    119   grpc::string user_id_;
    120   grpc::string client_ip_hex_;
    121 };
    122 
    123 // The value of a load record.
    124 class LoadRecordValue {
    125  public:
    126   explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0,
    127                            uint64_t error_count = 0, uint64_t bytes_sent = 0,
    128                            uint64_t bytes_recv = 0, uint64_t latency_ms = 0)
    129       : start_count_(start_count),
    130         ok_count_(ok_count),
    131         error_count_(error_count),
    132         bytes_sent_(bytes_sent),
    133         bytes_recv_(bytes_recv),
    134         latency_ms_(latency_ms) {}
    135 
    136   LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
    137                   double total_metric_value);
    138 
    139   void MergeFrom(const LoadRecordValue& other) {
    140     start_count_ += other.start_count_;
    141     ok_count_ += other.ok_count_;
    142     error_count_ += other.error_count_;
    143     bytes_sent_ += other.bytes_sent_;
    144     bytes_recv_ += other.bytes_recv_;
    145     latency_ms_ += other.latency_ms_;
    146     for (const auto& p : other.call_metrics_) {
    147       const grpc::string& key = p.first;
    148       const CallMetricValue& value = p.second;
    149       call_metrics_[key].MergeFrom(value);
    150     }
    151   }
    152 
    153   int64_t GetNumCallsInProgressDelta() const {
    154     return static_cast<int64_t>(start_count_ - ok_count_ - error_count_);
    155   }
    156 
    157   grpc::string ToString() const {
    158     return "[start_count_=" + grpc::to_string(start_count_) +
    159            ", ok_count_=" + grpc::to_string(ok_count_) +
    160            ", error_count_=" + grpc::to_string(error_count_) +
    161            ", bytes_sent_=" + grpc::to_string(bytes_sent_) +
    162            ", bytes_recv_=" + grpc::to_string(bytes_recv_) +
    163            ", latency_ms_=" + grpc::to_string(latency_ms_) + ", " +
    164            grpc::to_string(call_metrics_.size()) + " other call metric(s)]";
    165   }
    166 
    167   bool InsertCallMetric(const grpc::string& metric_name,
    168                         const CallMetricValue& metric_value) {
    169     return call_metrics_.insert({metric_name, metric_value}).second;
    170   }
    171 
    172   // Getters.
    173   uint64_t start_count() const { return start_count_; }
    174   uint64_t ok_count() const { return ok_count_; }
    175   uint64_t error_count() const { return error_count_; }
    176   uint64_t bytes_sent() const { return bytes_sent_; }
    177   uint64_t bytes_recv() const { return bytes_recv_; }
    178   uint64_t latency_ms() const { return latency_ms_; }
    179   const std::unordered_map<grpc::string, CallMetricValue>& call_metrics()
    180       const {
    181     return call_metrics_;
    182   }
    183 
    184  private:
    185   uint64_t start_count_ = 0;
    186   uint64_t ok_count_ = 0;
    187   uint64_t error_count_ = 0;
    188   uint64_t bytes_sent_ = 0;
    189   uint64_t bytes_recv_ = 0;
    190   uint64_t latency_ms_ = 0;
    191   std::unordered_map<grpc::string, CallMetricValue> call_metrics_;
    192 };
    193 
    194 // Stores the data associated with a particular LB ID.
    195 class PerBalancerStore {
    196  public:
    197   using LoadRecordMap =
    198       std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>;
    199 
    200   PerBalancerStore(grpc::string lb_id, grpc::string load_key)
    201       : lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {}
    202 
    203   // Merge a load record with the given key and value if the store is not
    204   // suspended.
    205   void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value);
    206 
    207   // Suspend this store, so that no detailed load data will be recorded.
    208   void Suspend();
    209   // Resume this store from suspension.
    210   void Resume();
    211   // Is this store suspended or not?
    212   bool IsSuspended() const { return suspended_; }
    213 
    214   bool IsNumCallsInProgressChangedSinceLastReport() const {
    215     return num_calls_in_progress_ != last_reported_num_calls_in_progress_;
    216   }
    217 
    218   uint64_t GetNumCallsInProgressForReport();
    219 
    220   grpc::string ToString() {
    221     return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ +
    222            "]";
    223   }
    224 
    225   void ClearLoadRecordMap() { load_record_map_.clear(); }
    226 
    227   // Getters.
    228   const grpc::string& lb_id() const { return lb_id_; }
    229   const grpc::string& load_key() const { return load_key_; }
    230   const LoadRecordMap& load_record_map() const { return load_record_map_; }
    231 
    232  private:
    233   grpc::string lb_id_;
    234   // TODO(juanlishen): Use bytestring protobuf type?
    235   grpc::string load_key_;
    236   LoadRecordMap load_record_map_;
    237   uint64_t num_calls_in_progress_ = 0;
    238   uint64_t last_reported_num_calls_in_progress_ = 0;
    239   bool suspended_ = false;
    240 };
    241 
    242 // Stores the data associated with a particular host.
    243 class PerHostStore {
    244  public:
    245   // When a report stream is created, a PerBalancerStore is created for the
    246   // LB ID (guaranteed unique) associated with that stream. If it is the only
    247   // active store, adopt all the orphaned stores. If it is the first created
    248   // store, adopt the store of kInvalidLbId.
    249   void ReportStreamCreated(const grpc::string& lb_id,
    250                            const grpc::string& load_key);
    251 
    252   // When a report stream is closed, the PerBalancerStores assigned to the
    253   // associate LB ID need to be re-assigned to other active balancers,
    254   // ideally with the same load key. If there is no active balancer, we have
    255   // to suspend those stores and drop the incoming load data until they are
    256   // resumed.
    257   void ReportStreamClosed(const grpc::string& lb_id);
    258 
    259   // Returns null if not found. Caller doesn't own the returned store.
    260   PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const;
    261 
    262   // Returns null if lb_id is not found. The returned pointer points to the
    263   // underlying data structure, which is not owned by the caller.
    264   const std::set<PerBalancerStore*>* GetAssignedStores(
    265       const grpc::string& lb_id) const;
    266 
    267  private:
    268   // Creates a PerBalancerStore for the given LB ID, assigns the store to
    269   // itself, and records the LB ID to the load key.
    270   void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key);
    271 
    272   void AssignOrphanedStore(PerBalancerStore* orphaned_store,
    273                            const grpc::string& new_receiver);
    274 
    275   std::unordered_map<grpc::string, std::set<grpc::string>>
    276       load_key_to_receiving_lb_ids_;
    277 
    278   // Key: LB ID. The key set includes all the LB IDs that have been
    279   // allocated for reporting streams so far.
    280   // Value: the unique pointer to the PerBalancerStore of the LB ID.
    281   std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>>
    282       per_balancer_stores_;
    283 
    284   // Key: LB ID. The key set includes the LB IDs of the balancers that are
    285   // currently receiving report.
    286   // Value: the set of raw pointers to the PerBalancerStores assigned to the LB
    287   // ID. Note that the sets in assigned_stores_ form a division of the value set
    288   // of per_balancer_stores_.
    289   std::unordered_map<grpc::string, std::set<PerBalancerStore*>>
    290       assigned_stores_;
    291 };
    292 
    293 // Thread-unsafe two-level bookkeeper of all the load data.
    294 // Note: We never remove any store objects from this class, as per the
    295 // current spec. That's because premature removal of the store objects
    296 // may lead to loss of critical information, e.g., mapping from lb_id to
    297 // load_key, and the number of in-progress calls. Such loss will cause
    298 // information inconsistency when the balancer is re-connected. Keeping
    299 // all the stores should be fine for PerHostStore, since we assume there
    300 // should only be a few hostnames. But it's a potential problem for
    301 // PerBalancerStore.
    302 class LoadDataStore {
    303  public:
    304   // Returns null if not found. Caller doesn't own the returned store.
    305   PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname,
    306                                          const grpc::string& lb_id) const;
    307 
    308   // Returns null if hostname or lb_id is not found. The returned pointer points
    309   // to the underlying data structure, which is not owned by the caller.
    310   const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname,
    311                                                        const string& lb_id);
    312 
    313   // If a PerBalancerStore can be found by the hostname and LB ID in
    314   // LoadRecordKey, the load data will be merged to that store. Otherwise,
    315   // only track the number of the in-progress calls for this unknown LB ID.
    316   void MergeRow(const grpc::string& hostname, const LoadRecordKey& key,
    317                 const LoadRecordValue& value);
    318 
    319   // Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
    320   // with some received load data but unknown to this load data store)?
    321   bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const {
    322     return unknown_balancer_id_trackers_.find(lb_id) !=
    323            unknown_balancer_id_trackers_.end();
    324   }
    325 
    326   // Wrapper around PerHostStore::ReportStreamCreated.
    327   void ReportStreamCreated(const grpc::string& hostname,
    328                            const grpc::string& lb_id,
    329                            const grpc::string& load_key);
    330 
    331   // Wrapper around PerHostStore::ReportStreamClosed.
    332   void ReportStreamClosed(const grpc::string& hostname,
    333                           const grpc::string& lb_id);
    334 
    335  private:
    336   // Buffered data that was fetched from Census but hasn't been sent to
    337   // balancer. We need to keep this data ourselves because Census will
    338   // delete the data once it's returned.
    339   std::unordered_map<grpc::string, PerHostStore> per_host_stores_;
    340 
    341   // Tracks the number of in-progress calls for each unknown LB ID.
    342   std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_;
    343 };
    344 
    345 }  // namespace load_reporter
    346 }  // namespace grpc
    347 
    348 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
    349