Home | History | Annotate | Download | only in load_reporter
      1 /*
      2  *
      3  * Copyright 2018 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/impl/codegen/port_platform.h>
     20 
     21 #include <stdio.h>
     22 #include <cstdlib>
     23 #include <set>
     24 #include <unordered_map>
     25 #include <vector>
     26 
     27 #include "src/core/lib/iomgr/socket_utils.h"
     28 #include "src/cpp/server/load_reporter/load_data_store.h"
     29 
     30 namespace grpc {
     31 namespace load_reporter {
     32 
     33 // Some helper functions.
     34 namespace {
     35 
     36 // Given a map from type K to a set of value type V, finds the set associated
     37 // with the given key and erases the value from the set. If the set becomes
     38 // empty, also erases the key-set pair. Returns true if the value is erased
     39 // successfully.
     40 template <typename K, typename V>
     41 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
     42                                     const K& key, const V& value) {
     43   auto it = map.find(key);
     44   if (it != map.end()) {
     45     size_t erased = it->second.erase(value);
     46     if (it->second.size() == 0) {
     47       map.erase(it);
     48     }
     49     return erased;
     50   }
     51   return false;
     52 };
     53 
     54 // Given a map from type K to a set of value type V, removes the given key and
     55 // the associated set, and returns the set. Returns an empty set if the key is
     56 // not found.
     57 template <typename K, typename V>
     58 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
     59                                      const K& key) {
     60   auto it = map.find(key);
     61   if (it != map.end()) {
     62     auto set = std::move(it->second);
     63     map.erase(it);
     64     return set;
     65   }
     66   return {};
     67 };
     68 
     69 // From a non-empty container, returns a pointer to a random element.
     70 template <typename C>
     71 const typename C::value_type* RandomElement(const C& container) {
     72   GPR_ASSERT(!container.empty());
     73   auto it = container.begin();
     74   std::advance(it, std::rand() % container.size());
     75   return &(*it);
     76 }
     77 
     78 }  // namespace
     79 
     80 LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token,
     81                              grpc::string user_id)
     82     : user_id_(std::move(user_id)) {
     83   GPR_ASSERT(client_ip_and_token.size() >= 2);
     84   int ip_hex_size;
     85   GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d",
     86                     &ip_hex_size) == 1);
     87   GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
     88              ip_hex_size == kIpv6AddressLength);
     89   size_t cur_pos = 2;
     90   client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
     91   cur_pos += ip_hex_size;
     92   if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
     93     lb_id_ = kInvalidLbId;
     94     lb_tag_ = "";
     95   } else {
     96     lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
     97     lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
     98   }
     99 }
    100 
    101 grpc::string LoadRecordKey::GetClientIpBytes() const {
    102   if (client_ip_hex_.empty()) {
    103     return "";
    104   } else if (client_ip_hex_.size() == kIpv4AddressLength) {
    105     uint32_t ip_bytes;
    106     if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
    107       gpr_log(GPR_ERROR,
    108               "Can't parse client IP (%s) from a hex string to an integer.",
    109               client_ip_hex_.c_str());
    110       return "";
    111     }
    112     ip_bytes = grpc_htonl(ip_bytes);
    113     return grpc::string(reinterpret_cast<const char*>(&ip_bytes),
    114                         sizeof(ip_bytes));
    115   } else if (client_ip_hex_.size() == kIpv6AddressLength) {
    116     uint32_t ip_bytes[4];
    117     for (size_t i = 0; i < 4; ++i) {
    118       if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
    119                  ip_bytes + i) != 1) {
    120         gpr_log(
    121             GPR_ERROR,
    122             "Can't parse client IP part (%s) from a hex string to an integer.",
    123             client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str());
    124         return "";
    125       }
    126       ip_bytes[i] = grpc_htonl(ip_bytes[i]);
    127     }
    128     return grpc::string(reinterpret_cast<const char*>(ip_bytes),
    129                         sizeof(ip_bytes));
    130   } else {
    131     GPR_UNREACHABLE_CODE(return "");
    132   }
    133 }
    134 
    135 LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls,
    136                                  double total_metric_value) {
    137   call_metrics_.emplace(std::move(metric_name),
    138                         CallMetricValue(num_calls, total_metric_value));
    139 }
    140 
    141 void PerBalancerStore::MergeRow(const LoadRecordKey& key,
    142                                 const LoadRecordValue& value) {
    143   // During suspension, the load data received will be dropped.
    144   if (!suspended_) {
    145     load_record_map_[key].MergeFrom(value);
    146     gpr_log(GPR_DEBUG,
    147             "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).",
    148             this, key.ToString().c_str(), value.ToString().c_str());
    149   } else {
    150     gpr_log(GPR_DEBUG,
    151             "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).",
    152             this, key.ToString().c_str(), value.ToString().c_str());
    153   }
    154   // We always keep track of num_calls_in_progress_, so that when this
    155   // store is resumed, we still have a correct value of
    156   // num_calls_in_progress_.
    157   GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) +
    158                  value.GetNumCallsInProgressDelta() >=
    159              0);
    160   num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
    161 }
    162 
    163 void PerBalancerStore::Suspend() {
    164   suspended_ = true;
    165   load_record_map_.clear();
    166   gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this);
    167 }
    168 
    169 void PerBalancerStore::Resume() {
    170   suspended_ = false;
    171   gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this);
    172 }
    173 
    174 uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
    175   GPR_ASSERT(!suspended_);
    176   last_reported_num_calls_in_progress_ = num_calls_in_progress_;
    177   return num_calls_in_progress_;
    178 }
    179 
    180 void PerHostStore::ReportStreamCreated(const grpc::string& lb_id,
    181                                        const grpc::string& load_key) {
    182   GPR_ASSERT(lb_id != kInvalidLbId);
    183   SetUpForNewLbId(lb_id, load_key);
    184   // Prior to this one, there was no load balancer receiving report, so we may
    185   // have unassigned orphaned stores to assign to this new balancer.
    186   // TODO(juanlishen): If the load key of this new stream is the same with
    187   // some previously adopted orphan store, we may want to take the orphan to
    188   // this stream. Need to discuss with LB team.
    189   if (assigned_stores_.size() == 1) {
    190     for (const auto& p : per_balancer_stores_) {
    191       const grpc::string& other_lb_id = p.first;
    192       const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
    193       if (other_lb_id != lb_id) {
    194         orphaned_store->Resume();
    195         AssignOrphanedStore(orphaned_store.get(), lb_id);
    196       }
    197     }
    198   }
    199   // The first connected balancer will adopt the kInvalidLbId.
    200   if (per_balancer_stores_.size() == 1) {
    201     SetUpForNewLbId(kInvalidLbId, "");
    202     ReportStreamClosed(kInvalidLbId);
    203   }
    204 }
    205 
    206 void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) {
    207   auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
    208   GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end());
    209   // Remove this closed stream from our records.
    210   GPR_ASSERT(UnorderedMapOfSetEraseKeyValue(
    211       load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(),
    212       lb_id));
    213   std::set<PerBalancerStore*> orphaned_stores =
    214       UnorderedMapOfSetExtract(assigned_stores_, lb_id);
    215   // The stores that were assigned to this balancer are orphaned now. They
    216   // should be re-assigned to other balancers which are still receiving reports.
    217   for (PerBalancerStore* orphaned_store : orphaned_stores) {
    218     const grpc::string* new_receiver = nullptr;
    219     auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
    220     if (it != load_key_to_receiving_lb_ids_.end()) {
    221       // First, try to pick from the active balancers with the same load key.
    222       new_receiver = RandomElement(it->second);
    223     } else if (!assigned_stores_.empty()) {
    224       // If failed, pick from all the remaining active balancers.
    225       new_receiver = &(RandomElement(assigned_stores_)->first);
    226     }
    227     if (new_receiver != nullptr) {
    228       AssignOrphanedStore(orphaned_store, *new_receiver);
    229     } else {
    230       // Load data for an LB ID that can't be assigned to any stream should
    231       // be dropped.
    232       orphaned_store->Suspend();
    233     }
    234   }
    235 }
    236 
    237 PerBalancerStore* PerHostStore::FindPerBalancerStore(
    238     const grpc::string& lb_id) const {
    239   return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
    240              ? per_balancer_stores_.find(lb_id)->second.get()
    241              : nullptr;
    242 }
    243 
    244 const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
    245     const grpc::string& lb_id) const {
    246   auto it = assigned_stores_.find(lb_id);
    247   if (it == assigned_stores_.end()) return nullptr;
    248   return &(it->second);
    249 }
    250 
    251 void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
    252                                        const grpc::string& new_receiver) {
    253   auto it = assigned_stores_.find(new_receiver);
    254   GPR_ASSERT(it != assigned_stores_.end());
    255   it->second.insert(orphaned_store);
    256   gpr_log(GPR_INFO,
    257           "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB"
    258           " ID of %s to new receiver %s",
    259           this, orphaned_store, orphaned_store->lb_id().c_str(),
    260           new_receiver.c_str());
    261 }
    262 
    263 void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id,
    264                                    const grpc::string& load_key) {
    265   // The top-level caller (i.e., LoadReportService) should guarantee the
    266   // lb_id is unique for each reporting stream.
    267   GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
    268   GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end());
    269   load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
    270   std::unique_ptr<PerBalancerStore> per_balancer_store(
    271       new PerBalancerStore(lb_id, load_key));
    272   assigned_stores_[lb_id] = {per_balancer_store.get()};
    273   per_balancer_stores_[lb_id] = std::move(per_balancer_store);
    274 }
    275 
    276 PerBalancerStore* LoadDataStore::FindPerBalancerStore(
    277     const string& hostname, const string& lb_id) const {
    278   auto it = per_host_stores_.find(hostname);
    279   if (it != per_host_stores_.end()) {
    280     const PerHostStore& per_host_store = it->second;
    281     return per_host_store.FindPerBalancerStore(lb_id);
    282   } else {
    283     return nullptr;
    284   }
    285 }
    286 
    287 void LoadDataStore::MergeRow(const grpc::string& hostname,
    288                              const LoadRecordKey& key,
    289                              const LoadRecordValue& value) {
    290   PerBalancerStore* per_balancer_store =
    291       FindPerBalancerStore(hostname, key.lb_id());
    292   if (per_balancer_store != nullptr) {
    293     per_balancer_store->MergeRow(key, value);
    294     return;
    295   }
    296   // Unknown LB ID. Track it until its number of in-progress calls drops to
    297   // zero.
    298   int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
    299   if (in_progress_delta != 0) {
    300     auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
    301     if (it_tracker == unknown_balancer_id_trackers_.end()) {
    302       gpr_log(
    303           GPR_DEBUG,
    304           "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).",
    305           this, key.lb_id().c_str());
    306       unknown_balancer_id_trackers_.insert(
    307           {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
    308     } else if ((it_tracker->second += in_progress_delta) == 0) {
    309       unknown_balancer_id_trackers_.erase(it_tracker);
    310       gpr_log(GPR_DEBUG,
    311               "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).",
    312               this, key.lb_id().c_str());
    313     }
    314   }
    315 }
    316 
    317 const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
    318     const grpc::string& hostname, const grpc::string& lb_id) {
    319   auto it = per_host_stores_.find(hostname);
    320   if (it == per_host_stores_.end()) return nullptr;
    321   return it->second.GetAssignedStores(lb_id);
    322 }
    323 
    324 void LoadDataStore::ReportStreamCreated(const grpc::string& hostname,
    325                                         const grpc::string& lb_id,
    326                                         const grpc::string& load_key) {
    327   per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
    328 }
    329 
    330 void LoadDataStore::ReportStreamClosed(const grpc::string& hostname,
    331                                        const grpc::string& lb_id) {
    332   auto it_per_host_store = per_host_stores_.find(hostname);
    333   GPR_ASSERT(it_per_host_store != per_host_stores_.end());
    334   it_per_host_store->second.ReportStreamClosed(lb_id);
    335 }
    336 
    337 }  // namespace load_reporter
    338 }  // namespace grpc
    339