1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/impl/codegen/port_platform.h> 20 21 #include <stdio.h> 22 #include <cstdlib> 23 #include <set> 24 #include <unordered_map> 25 #include <vector> 26 27 #include "src/core/lib/iomgr/socket_utils.h" 28 #include "src/cpp/server/load_reporter/load_data_store.h" 29 30 namespace grpc { 31 namespace load_reporter { 32 33 // Some helper functions. 34 namespace { 35 36 // Given a map from type K to a set of value type V, finds the set associated 37 // with the given key and erases the value from the set. If the set becomes 38 // empty, also erases the key-set pair. Returns true if the value is erased 39 // successfully. 40 template <typename K, typename V> 41 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map, 42 const K& key, const V& value) { 43 auto it = map.find(key); 44 if (it != map.end()) { 45 size_t erased = it->second.erase(value); 46 if (it->second.size() == 0) { 47 map.erase(it); 48 } 49 return erased; 50 } 51 return false; 52 }; 53 54 // Given a map from type K to a set of value type V, removes the given key and 55 // the associated set, and returns the set. Returns an empty set if the key is 56 // not found. 57 template <typename K, typename V> 58 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map, 59 const K& key) { 60 auto it = map.find(key); 61 if (it != map.end()) { 62 auto set = std::move(it->second); 63 map.erase(it); 64 return set; 65 } 66 return {}; 67 }; 68 69 // From a non-empty container, returns a pointer to a random element. 70 template <typename C> 71 const typename C::value_type* RandomElement(const C& container) { 72 GPR_ASSERT(!container.empty()); 73 auto it = container.begin(); 74 std::advance(it, std::rand() % container.size()); 75 return &(*it); 76 } 77 78 } // namespace 79 80 LoadRecordKey::LoadRecordKey(const grpc::string& client_ip_and_token, 81 grpc::string user_id) 82 : user_id_(std::move(user_id)) { 83 GPR_ASSERT(client_ip_and_token.size() >= 2); 84 int ip_hex_size; 85 GPR_ASSERT(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", 86 &ip_hex_size) == 1); 87 GPR_ASSERT(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength || 88 ip_hex_size == kIpv6AddressLength); 89 size_t cur_pos = 2; 90 client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size); 91 cur_pos += ip_hex_size; 92 if (client_ip_and_token.size() - cur_pos < kLbIdLength) { 93 lb_id_ = kInvalidLbId; 94 lb_tag_ = ""; 95 } else { 96 lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength); 97 lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength); 98 } 99 } 100 101 grpc::string LoadRecordKey::GetClientIpBytes() const { 102 if (client_ip_hex_.empty()) { 103 return ""; 104 } else if (client_ip_hex_.size() == kIpv4AddressLength) { 105 uint32_t ip_bytes; 106 if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) { 107 gpr_log(GPR_ERROR, 108 "Can't parse client IP (%s) from a hex string to an integer.", 109 client_ip_hex_.c_str()); 110 return ""; 111 } 112 ip_bytes = grpc_htonl(ip_bytes); 113 return grpc::string(reinterpret_cast<const char*>(&ip_bytes), 114 sizeof(ip_bytes)); 115 } else if (client_ip_hex_.size() == kIpv6AddressLength) { 116 uint32_t ip_bytes[4]; 117 for (size_t i = 0; i < 4; ++i) { 118 if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x", 119 ip_bytes + i) != 1) { 120 gpr_log( 121 GPR_ERROR, 122 "Can't parse client IP part (%s) from a hex string to an integer.", 123 client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str()); 124 return ""; 125 } 126 ip_bytes[i] = grpc_htonl(ip_bytes[i]); 127 } 128 return grpc::string(reinterpret_cast<const char*>(ip_bytes), 129 sizeof(ip_bytes)); 130 } else { 131 GPR_UNREACHABLE_CODE(return ""); 132 } 133 } 134 135 LoadRecordValue::LoadRecordValue(grpc::string metric_name, uint64_t num_calls, 136 double total_metric_value) { 137 call_metrics_.emplace(std::move(metric_name), 138 CallMetricValue(num_calls, total_metric_value)); 139 } 140 141 void PerBalancerStore::MergeRow(const LoadRecordKey& key, 142 const LoadRecordValue& value) { 143 // During suspension, the load data received will be dropped. 144 if (!suspended_) { 145 load_record_map_[key].MergeFrom(value); 146 gpr_log(GPR_DEBUG, 147 "[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).", 148 this, key.ToString().c_str(), value.ToString().c_str()); 149 } else { 150 gpr_log(GPR_DEBUG, 151 "[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).", 152 this, key.ToString().c_str(), value.ToString().c_str()); 153 } 154 // We always keep track of num_calls_in_progress_, so that when this 155 // store is resumed, we still have a correct value of 156 // num_calls_in_progress_. 157 GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) + 158 value.GetNumCallsInProgressDelta() >= 159 0); 160 num_calls_in_progress_ += value.GetNumCallsInProgressDelta(); 161 } 162 163 void PerBalancerStore::Suspend() { 164 suspended_ = true; 165 load_record_map_.clear(); 166 gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this); 167 } 168 169 void PerBalancerStore::Resume() { 170 suspended_ = false; 171 gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this); 172 } 173 174 uint64_t PerBalancerStore::GetNumCallsInProgressForReport() { 175 GPR_ASSERT(!suspended_); 176 last_reported_num_calls_in_progress_ = num_calls_in_progress_; 177 return num_calls_in_progress_; 178 } 179 180 void PerHostStore::ReportStreamCreated(const grpc::string& lb_id, 181 const grpc::string& load_key) { 182 GPR_ASSERT(lb_id != kInvalidLbId); 183 SetUpForNewLbId(lb_id, load_key); 184 // Prior to this one, there was no load balancer receiving report, so we may 185 // have unassigned orphaned stores to assign to this new balancer. 186 // TODO(juanlishen): If the load key of this new stream is the same with 187 // some previously adopted orphan store, we may want to take the orphan to 188 // this stream. Need to discuss with LB team. 189 if (assigned_stores_.size() == 1) { 190 for (const auto& p : per_balancer_stores_) { 191 const grpc::string& other_lb_id = p.first; 192 const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second; 193 if (other_lb_id != lb_id) { 194 orphaned_store->Resume(); 195 AssignOrphanedStore(orphaned_store.get(), lb_id); 196 } 197 } 198 } 199 // The first connected balancer will adopt the kInvalidLbId. 200 if (per_balancer_stores_.size() == 1) { 201 SetUpForNewLbId(kInvalidLbId, ""); 202 ReportStreamClosed(kInvalidLbId); 203 } 204 } 205 206 void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) { 207 auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id); 208 GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end()); 209 // Remove this closed stream from our records. 210 GPR_ASSERT(UnorderedMapOfSetEraseKeyValue( 211 load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(), 212 lb_id)); 213 std::set<PerBalancerStore*> orphaned_stores = 214 UnorderedMapOfSetExtract(assigned_stores_, lb_id); 215 // The stores that were assigned to this balancer are orphaned now. They 216 // should be re-assigned to other balancers which are still receiving reports. 217 for (PerBalancerStore* orphaned_store : orphaned_stores) { 218 const grpc::string* new_receiver = nullptr; 219 auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key()); 220 if (it != load_key_to_receiving_lb_ids_.end()) { 221 // First, try to pick from the active balancers with the same load key. 222 new_receiver = RandomElement(it->second); 223 } else if (!assigned_stores_.empty()) { 224 // If failed, pick from all the remaining active balancers. 225 new_receiver = &(RandomElement(assigned_stores_)->first); 226 } 227 if (new_receiver != nullptr) { 228 AssignOrphanedStore(orphaned_store, *new_receiver); 229 } else { 230 // Load data for an LB ID that can't be assigned to any stream should 231 // be dropped. 232 orphaned_store->Suspend(); 233 } 234 } 235 } 236 237 PerBalancerStore* PerHostStore::FindPerBalancerStore( 238 const grpc::string& lb_id) const { 239 return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end() 240 ? per_balancer_stores_.find(lb_id)->second.get() 241 : nullptr; 242 } 243 244 const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores( 245 const grpc::string& lb_id) const { 246 auto it = assigned_stores_.find(lb_id); 247 if (it == assigned_stores_.end()) return nullptr; 248 return &(it->second); 249 } 250 251 void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store, 252 const grpc::string& new_receiver) { 253 auto it = assigned_stores_.find(new_receiver); 254 GPR_ASSERT(it != assigned_stores_.end()); 255 it->second.insert(orphaned_store); 256 gpr_log(GPR_INFO, 257 "[PerHostStore %p] Re-assigned orphaned store (%p) with original LB" 258 " ID of %s to new receiver %s", 259 this, orphaned_store, orphaned_store->lb_id().c_str(), 260 new_receiver.c_str()); 261 } 262 263 void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id, 264 const grpc::string& load_key) { 265 // The top-level caller (i.e., LoadReportService) should guarantee the 266 // lb_id is unique for each reporting stream. 267 GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end()); 268 GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end()); 269 load_key_to_receiving_lb_ids_[load_key].insert(lb_id); 270 std::unique_ptr<PerBalancerStore> per_balancer_store( 271 new PerBalancerStore(lb_id, load_key)); 272 assigned_stores_[lb_id] = {per_balancer_store.get()}; 273 per_balancer_stores_[lb_id] = std::move(per_balancer_store); 274 } 275 276 PerBalancerStore* LoadDataStore::FindPerBalancerStore( 277 const string& hostname, const string& lb_id) const { 278 auto it = per_host_stores_.find(hostname); 279 if (it != per_host_stores_.end()) { 280 const PerHostStore& per_host_store = it->second; 281 return per_host_store.FindPerBalancerStore(lb_id); 282 } else { 283 return nullptr; 284 } 285 } 286 287 void LoadDataStore::MergeRow(const grpc::string& hostname, 288 const LoadRecordKey& key, 289 const LoadRecordValue& value) { 290 PerBalancerStore* per_balancer_store = 291 FindPerBalancerStore(hostname, key.lb_id()); 292 if (per_balancer_store != nullptr) { 293 per_balancer_store->MergeRow(key, value); 294 return; 295 } 296 // Unknown LB ID. Track it until its number of in-progress calls drops to 297 // zero. 298 int64_t in_progress_delta = value.GetNumCallsInProgressDelta(); 299 if (in_progress_delta != 0) { 300 auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id()); 301 if (it_tracker == unknown_balancer_id_trackers_.end()) { 302 gpr_log( 303 GPR_DEBUG, 304 "[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).", 305 this, key.lb_id().c_str()); 306 unknown_balancer_id_trackers_.insert( 307 {key.lb_id(), static_cast<uint64_t>(in_progress_delta)}); 308 } else if ((it_tracker->second += in_progress_delta) == 0) { 309 unknown_balancer_id_trackers_.erase(it_tracker); 310 gpr_log(GPR_DEBUG, 311 "[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).", 312 this, key.lb_id().c_str()); 313 } 314 } 315 } 316 317 const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores( 318 const grpc::string& hostname, const grpc::string& lb_id) { 319 auto it = per_host_stores_.find(hostname); 320 if (it == per_host_stores_.end()) return nullptr; 321 return it->second.GetAssignedStores(lb_id); 322 } 323 324 void LoadDataStore::ReportStreamCreated(const grpc::string& hostname, 325 const grpc::string& lb_id, 326 const grpc::string& load_key) { 327 per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key); 328 } 329 330 void LoadDataStore::ReportStreamClosed(const grpc::string& hostname, 331 const grpc::string& lb_id) { 332 auto it_per_host_store = per_host_stores_.find(hostname); 333 GPR_ASSERT(it_per_host_store != per_host_stores_.end()); 334 it_per_host_store->second.ReportStreamClosed(lb_id); 335 } 336 337 } // namespace load_reporter 338 } // namespace grpc 339