1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 /// Implementation of the gRPC LB policy. 20 /// 21 /// This policy takes as input a list of resolved addresses, which must 22 /// include at least one balancer address. 23 /// 24 /// An internal channel (\a lb_channel_) is created for the addresses 25 /// from that are balancers. This channel behaves just like a regular 26 /// channel that uses pick_first to select from the list of balancer 27 /// addresses. 28 /// 29 /// The first time the policy gets a request for a pick, a ping, or to exit 30 /// the idle state, \a StartPickingLocked() is called. This method is 31 /// responsible for instantiating the internal *streaming* call to the LB 32 /// server (whichever address pick_first chose). The call will be complete 33 /// when either the balancer sends status or when we cancel the call (e.g., 34 /// because we are shutting down). In needed, we retry the call. If we 35 /// received at least one valid message from the server, a new call attempt 36 /// will be made immediately; otherwise, we apply back-off delays between 37 /// attempts. 38 /// 39 /// We maintain an internal round_robin policy instance for distributing 40 /// requests across backends. Whenever we receive a new serverlist from 41 /// the balancer, we update the round_robin policy with the new list of 42 /// addresses. If we cannot communicate with the balancer on startup, 43 /// however, we may enter fallback mode, in which case we will populate 44 /// the RR policy's addresses from the backend addresses returned by the 45 /// resolver. 46 /// 47 /// Once an RR policy instance is in place (and getting updated as described), 48 /// calls for a pick, a ping, or a cancellation will be serviced right 49 /// away by forwarding them to the RR instance. Any time there's no RR 50 /// policy available (i.e., right after the creation of the gRPCLB policy), 51 /// pick and ping requests are added to a list of pending picks and pings 52 /// to be flushed and serviced when the RR policy instance becomes available. 53 /// 54 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the 55 /// high level design and details. 56 57 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when 58 // using that endpoint. Because of various transitive includes in uv.h, 59 // including windows.h on Windows, uv.h must be included before other system 60 // headers. Therefore, sockaddr.h must always be included first. 61 #include <grpc/support/port_platform.h> 62 63 #include "src/core/lib/iomgr/sockaddr.h" 64 #include "src/core/lib/iomgr/socket_utils.h" 65 66 #include <inttypes.h> 67 #include <limits.h> 68 #include <string.h> 69 70 #include <grpc/byte_buffer_reader.h> 71 #include <grpc/grpc.h> 72 #include <grpc/support/alloc.h> 73 #include <grpc/support/string_util.h> 74 #include <grpc/support/time.h> 75 76 #include "src/core/ext/filters/client_channel/client_channel.h" 77 #include "src/core/ext/filters/client_channel/client_channel_factory.h" 78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" 79 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" 80 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" 81 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" 82 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" 83 #include "src/core/ext/filters/client_channel/lb_policy_factory.h" 84 #include "src/core/ext/filters/client_channel/lb_policy_registry.h" 85 #include "src/core/ext/filters/client_channel/parse_address.h" 86 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" 87 #include "src/core/ext/filters/client_channel/subchannel_index.h" 88 #include "src/core/lib/backoff/backoff.h" 89 #include "src/core/lib/channel/channel_args.h" 90 #include "src/core/lib/channel/channel_stack.h" 91 #include "src/core/lib/gpr/host_port.h" 92 #include "src/core/lib/gpr/string.h" 93 #include "src/core/lib/gprpp/manual_constructor.h" 94 #include "src/core/lib/gprpp/memory.h" 95 #include "src/core/lib/gprpp/mutex_lock.h" 96 #include "src/core/lib/gprpp/orphanable.h" 97 #include "src/core/lib/gprpp/ref_counted_ptr.h" 98 #include "src/core/lib/iomgr/combiner.h" 99 #include "src/core/lib/iomgr/sockaddr.h" 100 #include "src/core/lib/iomgr/sockaddr_utils.h" 101 #include "src/core/lib/iomgr/timer.h" 102 #include "src/core/lib/slice/slice_hash_table.h" 103 #include "src/core/lib/slice/slice_internal.h" 104 #include "src/core/lib/slice/slice_string_helpers.h" 105 #include "src/core/lib/surface/call.h" 106 #include "src/core/lib/surface/channel.h" 107 #include "src/core/lib/surface/channel_init.h" 108 #include "src/core/lib/transport/static_metadata.h" 109 110 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 111 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 112 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 113 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 114 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 115 116 namespace grpc_core { 117 118 TraceFlag grpc_lb_glb_trace(false, "glb"); 119 120 namespace { 121 122 class GrpcLb : public LoadBalancingPolicy { 123 public: 124 GrpcLb(const grpc_lb_addresses* addresses, const Args& args); 125 126 void UpdateLocked(const grpc_channel_args& args) override; 127 bool PickLocked(PickState* pick, grpc_error** error) override; 128 void CancelPickLocked(PickState* pick, grpc_error* error) override; 129 void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, 130 uint32_t initial_metadata_flags_eq, 131 grpc_error* error) override; 132 void NotifyOnStateChangeLocked(grpc_connectivity_state* state, 133 grpc_closure* closure) override; 134 grpc_connectivity_state CheckConnectivityLocked( 135 grpc_error** connectivity_error) override; 136 void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; 137 void ExitIdleLocked() override; 138 void ResetBackoffLocked() override; 139 void FillChildRefsForChannelz(ChildRefsList* child_subchannels, 140 ChildRefsList* child_channels) override; 141 142 private: 143 /// Linked list of pending pick requests. It stores all information needed to 144 /// eventually call (Round Robin's) pick() on them. They mainly stay pending 145 /// waiting for the RR policy to be created. 146 /// 147 /// Note that when a pick is sent to the RR policy, we inject our own 148 /// on_complete callback, so that we can intercept the result before 149 /// invoking the original on_complete callback. This allows us to set the 150 /// LB token metadata and add client_stats to the call context. 151 /// See \a pending_pick_complete() for details. 152 struct PendingPick { 153 // The grpclb instance that created the wrapping. This instance is not 154 // owned; reference counts are untouched. It's used only for logging 155 // purposes. 156 GrpcLb* grpclb_policy; 157 // The original pick. 158 PickState* pick; 159 // Our on_complete closure and the original one. 160 grpc_closure on_complete; 161 grpc_closure* original_on_complete; 162 // The LB token associated with the pick. This is set via user_data in 163 // the pick. 164 grpc_mdelem lb_token; 165 // Stats for client-side load reporting. 166 RefCountedPtr<GrpcLbClientStats> client_stats; 167 // Next pending pick. 168 PendingPick* next = nullptr; 169 }; 170 171 /// Contains a call to the LB server and all the data related to the call. 172 class BalancerCallState 173 : public InternallyRefCountedWithTracing<BalancerCallState> { 174 public: 175 explicit BalancerCallState( 176 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy); 177 178 // It's the caller's responsibility to ensure that Orphan() is called from 179 // inside the combiner. 180 void Orphan() override; 181 182 void StartQuery(); 183 184 GrpcLbClientStats* client_stats() const { return client_stats_.get(); } 185 186 bool seen_initial_response() const { return seen_initial_response_; } 187 188 private: 189 // So Delete() can access our private dtor. 190 template <typename T> 191 friend void grpc_core::Delete(T*); 192 193 ~BalancerCallState(); 194 195 GrpcLb* grpclb_policy() const { 196 return static_cast<GrpcLb*>(grpclb_policy_.get()); 197 } 198 199 void ScheduleNextClientLoadReportLocked(); 200 void SendClientLoadReportLocked(); 201 202 static bool LoadReportCountersAreZero(grpc_grpclb_request* request); 203 204 static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); 205 static void ClientLoadReportDoneLocked(void* arg, grpc_error* error); 206 static void OnInitialRequestSentLocked(void* arg, grpc_error* error); 207 static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); 208 static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); 209 210 // The owning LB policy. 211 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_; 212 213 // The streaming call to the LB server. Always non-NULL. 214 grpc_call* lb_call_ = nullptr; 215 216 // recv_initial_metadata 217 grpc_metadata_array lb_initial_metadata_recv_; 218 219 // send_message 220 grpc_byte_buffer* send_message_payload_ = nullptr; 221 grpc_closure lb_on_initial_request_sent_; 222 223 // recv_message 224 grpc_byte_buffer* recv_message_payload_ = nullptr; 225 grpc_closure lb_on_balancer_message_received_; 226 bool seen_initial_response_ = false; 227 228 // recv_trailing_metadata 229 grpc_closure lb_on_balancer_status_received_; 230 grpc_metadata_array lb_trailing_metadata_recv_; 231 grpc_status_code lb_call_status_; 232 grpc_slice lb_call_status_details_; 233 234 // The stats for client-side load reporting associated with this LB call. 235 // Created after the first serverlist is received. 236 RefCountedPtr<GrpcLbClientStats> client_stats_; 237 grpc_millis client_stats_report_interval_ = 0; 238 grpc_timer client_load_report_timer_; 239 bool client_load_report_timer_callback_pending_ = false; 240 bool last_client_load_report_counters_were_zero_ = false; 241 bool client_load_report_is_due_ = false; 242 // The closure used for either the load report timer or the callback for 243 // completion of sending the load report. 244 grpc_closure client_load_report_closure_; 245 }; 246 247 ~GrpcLb(); 248 249 void ShutdownLocked() override; 250 251 // Helper function used in ctor and UpdateLocked(). 252 void ProcessChannelArgsLocked(const grpc_channel_args& args); 253 254 // Methods for dealing with the balancer channel and call. 255 void StartPickingLocked(); 256 void StartBalancerCallLocked(); 257 static void OnFallbackTimerLocked(void* arg, grpc_error* error); 258 void StartBalancerCallRetryTimerLocked(); 259 static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); 260 static void OnBalancerChannelConnectivityChangedLocked(void* arg, 261 grpc_error* error); 262 263 // Pending pick methods. 264 static void PendingPickSetMetadataAndContext(PendingPick* pp); 265 PendingPick* PendingPickCreate(PickState* pick); 266 void AddPendingPick(PendingPick* pp); 267 static void OnPendingPickComplete(void* arg, grpc_error* error); 268 269 // Methods for dealing with the RR policy. 270 void CreateOrUpdateRoundRobinPolicyLocked(); 271 grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); 272 void CreateRoundRobinPolicyLocked(const Args& args); 273 bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, 274 grpc_error** error); 275 void UpdateConnectivityStateFromRoundRobinPolicyLocked( 276 grpc_error* rr_state_error); 277 static void OnRoundRobinConnectivityChangedLocked(void* arg, 278 grpc_error* error); 279 static void OnRoundRobinRequestReresolutionLocked(void* arg, 280 grpc_error* error); 281 282 // Who the client is trying to communicate with. 283 const char* server_name_ = nullptr; 284 285 // Current channel args from the resolver. 286 grpc_channel_args* args_ = nullptr; 287 288 // Internal state. 289 bool started_picking_ = false; 290 bool shutting_down_ = false; 291 grpc_connectivity_state_tracker state_tracker_; 292 293 // The channel for communicating with the LB server. 294 grpc_channel* lb_channel_ = nullptr; 295 // Mutex to protect the channel to the LB server. This is used when 296 // processing a channelz request. 297 gpr_mu lb_channel_mu_; 298 grpc_connectivity_state lb_channel_connectivity_; 299 grpc_closure lb_channel_on_connectivity_changed_; 300 // Are we already watching the LB channel's connectivity? 301 bool watching_lb_channel_ = false; 302 // Response generator to inject address updates into lb_channel_. 303 RefCountedPtr<FakeResolverResponseGenerator> response_generator_; 304 305 // The data associated with the current LB call. It holds a ref to this LB 306 // policy. It's initialized every time we query for backends. It's reset to 307 // NULL whenever the current LB call is no longer needed (e.g., the LB policy 308 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always 309 // contains a non-NULL lb_call_. 310 OrphanablePtr<BalancerCallState> lb_calld_; 311 // Timeout in milliseconds for the LB call. 0 means no deadline. 312 int lb_call_timeout_ms_ = 0; 313 // Balancer call retry state. 314 BackOff lb_call_backoff_; 315 bool retry_timer_callback_pending_ = false; 316 grpc_timer lb_call_retry_timer_; 317 grpc_closure lb_on_call_retry_; 318 319 // The deserialized response from the balancer. May be nullptr until one 320 // such response has arrived. 321 grpc_grpclb_serverlist* serverlist_ = nullptr; 322 // Index into serverlist for next pick. 323 // If the server at this index is a drop, we return a drop. 324 // Otherwise, we delegate to the RR policy. 325 size_t serverlist_index_ = 0; 326 327 // Timeout in milliseconds for before using fallback backend addresses. 328 // 0 means not using fallback. 329 int lb_fallback_timeout_ms_ = 0; 330 // The backend addresses from the resolver. 331 grpc_lb_addresses* fallback_backend_addresses_ = nullptr; 332 // Fallback timer. 333 bool fallback_timer_callback_pending_ = false; 334 grpc_timer lb_fallback_timer_; 335 grpc_closure lb_on_fallback_; 336 337 // Pending picks that are waiting on the RR policy's connectivity. 338 PendingPick* pending_picks_ = nullptr; 339 340 // The RR policy to use for the backends. 341 OrphanablePtr<LoadBalancingPolicy> rr_policy_; 342 grpc_connectivity_state rr_connectivity_state_; 343 grpc_closure on_rr_connectivity_changed_; 344 grpc_closure on_rr_request_reresolution_; 345 }; 346 347 // 348 // serverlist parsing code 349 // 350 351 // vtable for LB tokens in grpc_lb_addresses 352 void* lb_token_copy(void* token) { 353 return token == nullptr 354 ? nullptr 355 : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; 356 } 357 void lb_token_destroy(void* token) { 358 if (token != nullptr) { 359 GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); 360 } 361 } 362 int lb_token_cmp(void* token1, void* token2) { 363 if (token1 > token2) return 1; 364 if (token1 < token2) return -1; 365 return 0; 366 } 367 const grpc_lb_user_data_vtable lb_token_vtable = { 368 lb_token_copy, lb_token_destroy, lb_token_cmp}; 369 370 // Returns the backend addresses extracted from the given addresses. 371 grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { 372 // First pass: count the number of backend addresses. 373 size_t num_backends = 0; 374 for (size_t i = 0; i < addresses->num_addresses; ++i) { 375 if (!addresses->addresses[i].is_balancer) { 376 ++num_backends; 377 } 378 } 379 // Second pass: actually populate the addresses and (empty) LB tokens. 380 grpc_lb_addresses* backend_addresses = 381 grpc_lb_addresses_create(num_backends, &lb_token_vtable); 382 size_t num_copied = 0; 383 for (size_t i = 0; i < addresses->num_addresses; ++i) { 384 if (addresses->addresses[i].is_balancer) continue; 385 const grpc_resolved_address* addr = &addresses->addresses[i].address; 386 grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, 387 addr->len, false /* is_balancer */, 388 nullptr /* balancer_name */, 389 (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); 390 ++num_copied; 391 } 392 return backend_addresses; 393 } 394 395 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) { 396 if (server->drop) return false; 397 const grpc_grpclb_ip_address* ip = &server->ip_address; 398 if (GPR_UNLIKELY(server->port >> 16 != 0)) { 399 if (log) { 400 gpr_log(GPR_ERROR, 401 "Invalid port '%d' at index %lu of serverlist. Ignoring.", 402 server->port, (unsigned long)idx); 403 } 404 return false; 405 } 406 if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) { 407 if (log) { 408 gpr_log(GPR_ERROR, 409 "Expected IP to be 4 or 16 bytes, got %d at index %lu of " 410 "serverlist. Ignoring", 411 ip->size, (unsigned long)idx); 412 } 413 return false; 414 } 415 return true; 416 } 417 418 void ParseServer(const grpc_grpclb_server* server, 419 grpc_resolved_address* addr) { 420 memset(addr, 0, sizeof(*addr)); 421 if (server->drop) return; 422 const uint16_t netorder_port = grpc_htons((uint16_t)server->port); 423 /* the addresses are given in binary format (a in(6)_addr struct) in 424 * server->ip_address.bytes. */ 425 const grpc_grpclb_ip_address* ip = &server->ip_address; 426 if (ip->size == 4) { 427 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in)); 428 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr); 429 addr4->sin_family = GRPC_AF_INET; 430 memcpy(&addr4->sin_addr, ip->bytes, ip->size); 431 addr4->sin_port = netorder_port; 432 } else if (ip->size == 16) { 433 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6)); 434 grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr; 435 addr6->sin6_family = GRPC_AF_INET6; 436 memcpy(&addr6->sin6_addr, ip->bytes, ip->size); 437 addr6->sin6_port = netorder_port; 438 } 439 } 440 441 // Returns addresses extracted from \a serverlist. 442 grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) { 443 size_t num_valid = 0; 444 /* first pass: count how many are valid in order to allocate the necessary 445 * memory in a single block */ 446 for (size_t i = 0; i < serverlist->num_servers; ++i) { 447 if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; 448 } 449 grpc_lb_addresses* lb_addresses = 450 grpc_lb_addresses_create(num_valid, &lb_token_vtable); 451 /* second pass: actually populate the addresses and LB tokens (aka user data 452 * to the outside world) to be read by the RR policy during its creation. 453 * Given that the validity tests are very cheap, they are performed again 454 * instead of marking the valid ones during the first pass, as this would 455 * incurr in an allocation due to the arbitrary number of server */ 456 size_t addr_idx = 0; 457 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { 458 const grpc_grpclb_server* server = serverlist->servers[sl_idx]; 459 if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; 460 GPR_ASSERT(addr_idx < num_valid); 461 /* address processing */ 462 grpc_resolved_address addr; 463 ParseServer(server, &addr); 464 /* lb token processing */ 465 void* user_data; 466 if (server->has_load_balance_token) { 467 const size_t lb_token_max_length = 468 GPR_ARRAY_SIZE(server->load_balance_token); 469 const size_t lb_token_length = 470 strnlen(server->load_balance_token, lb_token_max_length); 471 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( 472 server->load_balance_token, lb_token_length); 473 user_data = 474 (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) 475 .payload; 476 } else { 477 char* uri = grpc_sockaddr_to_uri(&addr); 478 gpr_log(GPR_INFO, 479 "Missing LB token for backend address '%s'. The empty token will " 480 "be used instead", 481 uri); 482 gpr_free(uri); 483 user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; 484 } 485 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, 486 false /* is_balancer */, 487 nullptr /* balancer_name */, user_data); 488 ++addr_idx; 489 } 490 GPR_ASSERT(addr_idx == num_valid); 491 return lb_addresses; 492 } 493 494 // 495 // GrpcLb::BalancerCallState 496 // 497 498 GrpcLb::BalancerCallState::BalancerCallState( 499 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy) 500 : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace), 501 grpclb_policy_(std::move(parent_grpclb_policy)) { 502 GPR_ASSERT(grpclb_policy_ != nullptr); 503 GPR_ASSERT(!grpclb_policy()->shutting_down_); 504 // Init the LB call. Note that the LB call will progress every time there's 505 // activity in grpclb_policy_->interested_parties(), which is comprised of 506 // the polling entities from client_channel. 507 GPR_ASSERT(grpclb_policy()->server_name_ != nullptr); 508 GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0'); 509 const grpc_millis deadline = 510 grpclb_policy()->lb_call_timeout_ms_ == 0 511 ? GRPC_MILLIS_INF_FUTURE 512 : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_; 513 lb_call_ = grpc_channel_create_pollset_set_call( 514 grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, 515 grpclb_policy_->interested_parties(), 516 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, 517 nullptr, deadline, nullptr); 518 // Init the LB call request payload. 519 grpc_grpclb_request* request = 520 grpc_grpclb_request_create(grpclb_policy()->server_name_); 521 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); 522 send_message_payload_ = 523 grpc_raw_byte_buffer_create(&request_payload_slice, 1); 524 grpc_slice_unref_internal(request_payload_slice); 525 grpc_grpclb_request_destroy(request); 526 // Init other data associated with the LB call. 527 grpc_metadata_array_init(&lb_initial_metadata_recv_); 528 grpc_metadata_array_init(&lb_trailing_metadata_recv_); 529 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked, 530 this, grpc_combiner_scheduler(grpclb_policy()->combiner())); 531 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, 532 OnBalancerMessageReceivedLocked, this, 533 grpc_combiner_scheduler(grpclb_policy()->combiner())); 534 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, 535 OnBalancerStatusReceivedLocked, this, 536 grpc_combiner_scheduler(grpclb_policy()->combiner())); 537 } 538 539 GrpcLb::BalancerCallState::~BalancerCallState() { 540 GPR_ASSERT(lb_call_ != nullptr); 541 grpc_call_unref(lb_call_); 542 grpc_metadata_array_destroy(&lb_initial_metadata_recv_); 543 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_); 544 grpc_byte_buffer_destroy(send_message_payload_); 545 grpc_byte_buffer_destroy(recv_message_payload_); 546 grpc_slice_unref_internal(lb_call_status_details_); 547 } 548 549 void GrpcLb::BalancerCallState::Orphan() { 550 GPR_ASSERT(lb_call_ != nullptr); 551 // If we are here because grpclb_policy wants to cancel the call, 552 // lb_on_balancer_status_received_ will complete the cancellation and clean 553 // up. Otherwise, we are here because grpclb_policy has to orphan a failed 554 // call, then the following cancellation will be a no-op. 555 grpc_call_cancel(lb_call_, nullptr); 556 if (client_load_report_timer_callback_pending_) { 557 grpc_timer_cancel(&client_load_report_timer_); 558 } 559 // Note that the initial ref is hold by lb_on_balancer_status_received_ 560 // instead of the caller of this function. So the corresponding unref happens 561 // in lb_on_balancer_status_received_ instead of here. 562 } 563 564 void GrpcLb::BalancerCallState::StartQuery() { 565 GPR_ASSERT(lb_call_ != nullptr); 566 if (grpc_lb_glb_trace.enabled()) { 567 gpr_log(GPR_INFO, 568 "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)", 569 grpclb_policy_.get(), this, lb_call_); 570 } 571 // Create the ops. 572 grpc_call_error call_error; 573 grpc_op ops[3]; 574 memset(ops, 0, sizeof(ops)); 575 // Op: send initial metadata. 576 grpc_op* op = ops; 577 op->op = GRPC_OP_SEND_INITIAL_METADATA; 578 op->data.send_initial_metadata.count = 0; 579 op->flags = 0; 580 op->reserved = nullptr; 581 op++; 582 // Op: send request message. 583 GPR_ASSERT(send_message_payload_ != nullptr); 584 op->op = GRPC_OP_SEND_MESSAGE; 585 op->data.send_message.send_message = send_message_payload_; 586 op->flags = 0; 587 op->reserved = nullptr; 588 op++; 589 // TODO(roth): We currently track this ref manually. Once the 590 // ClosureRef API is ready, we should pass the RefCountedPtr<> along 591 // with the callback. 592 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent"); 593 self.release(); 594 call_error = grpc_call_start_batch_and_execute( 595 lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_); 596 GPR_ASSERT(GRPC_CALL_OK == call_error); 597 // Op: recv initial metadata. 598 op = ops; 599 op->op = GRPC_OP_RECV_INITIAL_METADATA; 600 op->data.recv_initial_metadata.recv_initial_metadata = 601 &lb_initial_metadata_recv_; 602 op->flags = 0; 603 op->reserved = nullptr; 604 op++; 605 // Op: recv response. 606 op->op = GRPC_OP_RECV_MESSAGE; 607 op->data.recv_message.recv_message = &recv_message_payload_; 608 op->flags = 0; 609 op->reserved = nullptr; 610 op++; 611 // TODO(roth): We currently track this ref manually. Once the 612 // ClosureRef API is ready, we should pass the RefCountedPtr<> along 613 // with the callback. 614 self = Ref(DEBUG_LOCATION, "on_message_received"); 615 self.release(); 616 call_error = grpc_call_start_batch_and_execute( 617 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_); 618 GPR_ASSERT(GRPC_CALL_OK == call_error); 619 // Op: recv server status. 620 op = ops; 621 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; 622 op->data.recv_status_on_client.trailing_metadata = 623 &lb_trailing_metadata_recv_; 624 op->data.recv_status_on_client.status = &lb_call_status_; 625 op->data.recv_status_on_client.status_details = &lb_call_status_details_; 626 op->flags = 0; 627 op->reserved = nullptr; 628 op++; 629 // This callback signals the end of the LB call, so it relies on the initial 630 // ref instead of a new ref. When it's invoked, it's the initial ref that is 631 // unreffed. 632 call_error = grpc_call_start_batch_and_execute( 633 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_); 634 GPR_ASSERT(GRPC_CALL_OK == call_error); 635 }; 636 637 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { 638 const grpc_millis next_client_load_report_time = 639 ExecCtx::Get()->Now() + client_stats_report_interval_; 640 GRPC_CLOSURE_INIT(&client_load_report_closure_, 641 MaybeSendClientLoadReportLocked, this, 642 grpc_combiner_scheduler(grpclb_policy()->combiner())); 643 grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, 644 &client_load_report_closure_); 645 client_load_report_timer_callback_pending_ = true; 646 } 647 648 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked( 649 void* arg, grpc_error* error) { 650 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); 651 GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); 652 lb_calld->client_load_report_timer_callback_pending_ = false; 653 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) { 654 lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); 655 return; 656 } 657 // If we've already sent the initial request, then we can go ahead and send 658 // the load report. Otherwise, we need to wait until the initial request has 659 // been sent to send this (see OnInitialRequestSentLocked()). 660 if (lb_calld->send_message_payload_ == nullptr) { 661 lb_calld->SendClientLoadReportLocked(); 662 } else { 663 lb_calld->client_load_report_is_due_ = true; 664 } 665 } 666 667 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero( 668 grpc_grpclb_request* request) { 669 GrpcLbClientStats::DroppedCallCounts* drop_entries = 670 static_cast<GrpcLbClientStats::DroppedCallCounts*>( 671 request->client_stats.calls_finished_with_drop.arg); 672 return request->client_stats.num_calls_started == 0 && 673 request->client_stats.num_calls_finished == 0 && 674 request->client_stats.num_calls_finished_with_client_failed_to_send == 675 0 && 676 request->client_stats.num_calls_finished_known_received == 0 && 677 (drop_entries == nullptr || drop_entries->size() == 0); 678 } 679 680 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { 681 // Construct message payload. 682 GPR_ASSERT(send_message_payload_ == nullptr); 683 grpc_grpclb_request* request = 684 grpc_grpclb_load_report_request_create_locked(client_stats_.get()); 685 // Skip client load report if the counters were all zero in the last 686 // report and they are still zero in this one. 687 if (LoadReportCountersAreZero(request)) { 688 if (last_client_load_report_counters_were_zero_) { 689 grpc_grpclb_request_destroy(request); 690 ScheduleNextClientLoadReportLocked(); 691 return; 692 } 693 last_client_load_report_counters_were_zero_ = true; 694 } else { 695 last_client_load_report_counters_were_zero_ = false; 696 } 697 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); 698 send_message_payload_ = 699 grpc_raw_byte_buffer_create(&request_payload_slice, 1); 700 grpc_slice_unref_internal(request_payload_slice); 701 grpc_grpclb_request_destroy(request); 702 // Send the report. 703 grpc_op op; 704 memset(&op, 0, sizeof(op)); 705 op.op = GRPC_OP_SEND_MESSAGE; 706 op.data.send_message.send_message = send_message_payload_; 707 GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked, 708 this, grpc_combiner_scheduler(grpclb_policy()->combiner())); 709 grpc_call_error call_error = grpc_call_start_batch_and_execute( 710 lb_call_, &op, 1, &client_load_report_closure_); 711 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { 712 gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(), 713 call_error); 714 GPR_ASSERT(GRPC_CALL_OK == call_error); 715 } 716 } 717 718 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, 719 grpc_error* error) { 720 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); 721 GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); 722 grpc_byte_buffer_destroy(lb_calld->send_message_payload_); 723 lb_calld->send_message_payload_ = nullptr; 724 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) { 725 lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); 726 return; 727 } 728 lb_calld->ScheduleNextClientLoadReportLocked(); 729 } 730 731 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, 732 grpc_error* error) { 733 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); 734 grpc_byte_buffer_destroy(lb_calld->send_message_payload_); 735 lb_calld->send_message_payload_ = nullptr; 736 // If we attempted to send a client load report before the initial request was 737 // sent (and this lb_calld is still in use), send the load report now. 738 if (lb_calld->client_load_report_is_due_ && 739 lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) { 740 lb_calld->SendClientLoadReportLocked(); 741 lb_calld->client_load_report_is_due_ = false; 742 } 743 lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent"); 744 } 745 746 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( 747 void* arg, grpc_error* error) { 748 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); 749 GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); 750 // Empty payload means the LB call was cancelled. 751 if (lb_calld != grpclb_policy->lb_calld_.get() || 752 lb_calld->recv_message_payload_ == nullptr) { 753 lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); 754 return; 755 } 756 grpc_byte_buffer_reader bbr; 757 grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_); 758 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); 759 grpc_byte_buffer_reader_destroy(&bbr); 760 grpc_byte_buffer_destroy(lb_calld->recv_message_payload_); 761 lb_calld->recv_message_payload_ = nullptr; 762 grpc_grpclb_initial_response* initial_response; 763 grpc_grpclb_serverlist* serverlist; 764 if (!lb_calld->seen_initial_response_ && 765 (initial_response = grpc_grpclb_initial_response_parse(response_slice)) != 766 nullptr) { 767 // Have NOT seen initial response, look for initial response. 768 if (initial_response->has_client_stats_report_interval) { 769 lb_calld->client_stats_report_interval_ = GPR_MAX( 770 GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( 771 &initial_response->client_stats_report_interval)); 772 if (grpc_lb_glb_trace.enabled()) { 773 gpr_log(GPR_INFO, 774 "[grpclb %p] Received initial LB response message; " 775 "client load reporting interval = %" PRId64 " milliseconds", 776 grpclb_policy, lb_calld->client_stats_report_interval_); 777 } 778 } else if (grpc_lb_glb_trace.enabled()) { 779 gpr_log(GPR_INFO, 780 "[grpclb %p] Received initial LB response message; client load " 781 "reporting NOT enabled", 782 grpclb_policy); 783 } 784 grpc_grpclb_initial_response_destroy(initial_response); 785 lb_calld->seen_initial_response_ = true; 786 } else if ((serverlist = grpc_grpclb_response_parse_serverlist( 787 response_slice)) != nullptr) { 788 // Have seen initial response, look for serverlist. 789 GPR_ASSERT(lb_calld->lb_call_ != nullptr); 790 if (grpc_lb_glb_trace.enabled()) { 791 gpr_log(GPR_INFO, 792 "[grpclb %p] Serverlist with %" PRIuPTR " servers received", 793 grpclb_policy, serverlist->num_servers); 794 for (size_t i = 0; i < serverlist->num_servers; ++i) { 795 grpc_resolved_address addr; 796 ParseServer(serverlist->servers[i], &addr); 797 char* ipport; 798 grpc_sockaddr_to_string(&ipport, &addr, false); 799 gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", 800 grpclb_policy, i, ipport); 801 gpr_free(ipport); 802 } 803 } 804 /* update serverlist */ 805 if (serverlist->num_servers > 0) { 806 // Start sending client load report only after we start using the 807 // serverlist returned from the current LB call. 808 if (lb_calld->client_stats_report_interval_ > 0 && 809 lb_calld->client_stats_ == nullptr) { 810 lb_calld->client_stats_.reset(New<GrpcLbClientStats>()); 811 // TODO(roth): We currently track this ref manually. Once the 812 // ClosureRef API is ready, we should pass the RefCountedPtr<> along 813 // with the callback. 814 auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); 815 self.release(); 816 lb_calld->ScheduleNextClientLoadReportLocked(); 817 } 818 if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, 819 serverlist)) { 820 if (grpc_lb_glb_trace.enabled()) { 821 gpr_log(GPR_INFO, 822 "[grpclb %p] Incoming server list identical to current, " 823 "ignoring.", 824 grpclb_policy); 825 } 826 grpc_grpclb_destroy_serverlist(serverlist); 827 } else { /* new serverlist */ 828 if (grpclb_policy->serverlist_ != nullptr) { 829 /* dispose of the old serverlist */ 830 grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_); 831 } else { 832 /* or dispose of the fallback */ 833 grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_); 834 grpclb_policy->fallback_backend_addresses_ = nullptr; 835 if (grpclb_policy->fallback_timer_callback_pending_) { 836 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); 837 } 838 } 839 // and update the copy in the GrpcLb instance. This 840 // serverlist instance will be destroyed either upon the next 841 // update or when the GrpcLb instance is destroyed. 842 grpclb_policy->serverlist_ = serverlist; 843 grpclb_policy->serverlist_index_ = 0; 844 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); 845 } 846 } else { 847 if (grpc_lb_glb_trace.enabled()) { 848 gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", 849 grpclb_policy); 850 } 851 grpc_grpclb_destroy_serverlist(serverlist); 852 } 853 } else { 854 // No valid initial response or serverlist found. 855 gpr_log(GPR_ERROR, 856 "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", 857 grpclb_policy, 858 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); 859 } 860 grpc_slice_unref_internal(response_slice); 861 if (!grpclb_policy->shutting_down_) { 862 // Keep listening for serverlist updates. 863 grpc_op op; 864 memset(&op, 0, sizeof(op)); 865 op.op = GRPC_OP_RECV_MESSAGE; 866 op.data.recv_message.recv_message = &lb_calld->recv_message_payload_; 867 op.flags = 0; 868 op.reserved = nullptr; 869 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery(). 870 const grpc_call_error call_error = grpc_call_start_batch_and_execute( 871 lb_calld->lb_call_, &op, 1, 872 &lb_calld->lb_on_balancer_message_received_); 873 GPR_ASSERT(GRPC_CALL_OK == call_error); 874 } else { 875 lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); 876 } 877 } 878 879 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked( 880 void* arg, grpc_error* error) { 881 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); 882 GrpcLb* grpclb_policy = lb_calld->grpclb_policy(); 883 GPR_ASSERT(lb_calld->lb_call_ != nullptr); 884 if (grpc_lb_glb_trace.enabled()) { 885 char* status_details = 886 grpc_slice_to_c_string(lb_calld->lb_call_status_details_); 887 gpr_log(GPR_INFO, 888 "[grpclb %p] Status from LB server received. Status = %d, details " 889 "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", 890 grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld, 891 lb_calld->lb_call_, grpc_error_string(error)); 892 gpr_free(status_details); 893 } 894 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE); 895 // If this lb_calld is still in use, this call ended because of a failure so 896 // we want to retry connecting. Otherwise, we have deliberately ended this 897 // call and no further action is required. 898 if (lb_calld == grpclb_policy->lb_calld_.get()) { 899 grpclb_policy->lb_calld_.reset(); 900 GPR_ASSERT(!grpclb_policy->shutting_down_); 901 if (lb_calld->seen_initial_response_) { 902 // If we lose connection to the LB server, reset the backoff and restart 903 // the LB call immediately. 904 grpclb_policy->lb_call_backoff_.Reset(); 905 grpclb_policy->StartBalancerCallLocked(); 906 } else { 907 // If this LB call fails establishing any connection to the LB server, 908 // retry later. 909 grpclb_policy->StartBalancerCallRetryTimerLocked(); 910 } 911 } 912 lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended"); 913 } 914 915 // 916 // helper code for creating balancer channel 917 // 918 919 grpc_lb_addresses* ExtractBalancerAddresses( 920 const grpc_lb_addresses* addresses) { 921 size_t num_grpclb_addrs = 0; 922 for (size_t i = 0; i < addresses->num_addresses; ++i) { 923 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; 924 } 925 // There must be at least one balancer address, or else the 926 // client_channel would not have chosen this LB policy. 927 GPR_ASSERT(num_grpclb_addrs > 0); 928 grpc_lb_addresses* lb_addresses = 929 grpc_lb_addresses_create(num_grpclb_addrs, nullptr); 930 size_t lb_addresses_idx = 0; 931 for (size_t i = 0; i < addresses->num_addresses; ++i) { 932 if (!addresses->addresses[i].is_balancer) continue; 933 if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { 934 gpr_log(GPR_ERROR, 935 "This LB policy doesn't support user data. It will be ignored"); 936 } 937 grpc_lb_addresses_set_address( 938 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, 939 addresses->addresses[i].address.len, false /* is balancer */, 940 addresses->addresses[i].balancer_name, nullptr /* user data */); 941 } 942 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); 943 return lb_addresses; 944 } 945 946 /* Returns the channel args for the LB channel, used to create a bidirectional 947 * stream for the reception of load balancing updates. 948 * 949 * Inputs: 950 * - \a addresses: corresponding to the balancers. 951 * - \a response_generator: in order to propagate updates from the resolver 952 * above the grpclb policy. 953 * - \a args: other args inherited from the grpclb policy. */ 954 grpc_channel_args* BuildBalancerChannelArgs( 955 const grpc_lb_addresses* addresses, 956 FakeResolverResponseGenerator* response_generator, 957 const grpc_channel_args* args) { 958 grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); 959 // Channel args to remove. 960 static const char* args_to_remove[] = { 961 // LB policy name, since we want to use the default (pick_first) in 962 // the LB channel. 963 GRPC_ARG_LB_POLICY_NAME, 964 // The channel arg for the server URI, since that will be different for 965 // the LB channel than for the parent channel. The client channel 966 // factory will re-add this arg with the right value. 967 GRPC_ARG_SERVER_URI, 968 // The resolved addresses, which will be generated by the name resolver 969 // used in the LB channel. Note that the LB channel will use the fake 970 // resolver, so this won't actually generate a query to DNS (or some 971 // other name service). However, the addresses returned by the fake 972 // resolver will have is_balancer=false, whereas our own addresses have 973 // is_balancer=true. We need the LB channel to return addresses with 974 // is_balancer=false so that it does not wind up recursively using the 975 // grpclb LB policy, as per the special case logic in client_channel.c. 976 GRPC_ARG_LB_ADDRESSES, 977 // The fake resolver response generator, because we are replacing it 978 // with the one from the grpclb policy, used to propagate updates to 979 // the LB channel. 980 GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, 981 // The LB channel should use the authority indicated by the target 982 // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args), 983 // as opposed to the authority from the parent channel. 984 GRPC_ARG_DEFAULT_AUTHORITY, 985 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be 986 // treated as a stand-alone channel and not inherit this argument from the 987 // args of the parent channel. 988 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, 989 }; 990 // Channel args to add. 991 const grpc_arg args_to_add[] = { 992 // New LB addresses. 993 // Note that we pass these in both when creating the LB channel 994 // and via the fake resolver. The latter is what actually gets used. 995 grpc_lb_addresses_create_channel_arg(lb_addresses), 996 // The fake resolver response generator, which we use to inject 997 // address updates into the LB channel. 998 grpc_core::FakeResolverResponseGenerator::MakeChannelArg( 999 response_generator), 1000 // A channel arg indicating the target is a grpclb load balancer. 1001 grpc_channel_arg_integer_create( 1002 const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1), 1003 // A channel arg indicating this is an internal channels, aka it is 1004 // owned by components in Core, not by the user application. 1005 grpc_channel_arg_integer_create( 1006 const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1), 1007 }; 1008 // Construct channel args. 1009 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( 1010 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, 1011 GPR_ARRAY_SIZE(args_to_add)); 1012 // Make any necessary modifications for security. 1013 new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args); 1014 // Clean up. 1015 grpc_lb_addresses_destroy(lb_addresses); 1016 return new_args; 1017 } 1018 1019 // 1020 // ctor and dtor 1021 // 1022 1023 GrpcLb::GrpcLb(const grpc_lb_addresses* addresses, 1024 const LoadBalancingPolicy::Args& args) 1025 : LoadBalancingPolicy(args), 1026 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), 1027 lb_call_backoff_( 1028 BackOff::Options() 1029 .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1030 1000) 1031 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) 1032 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) 1033 .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1034 1000)) { 1035 // Initialization. 1036 gpr_mu_init(&lb_channel_mu_); 1037 grpc_subchannel_index_ref(); 1038 GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, 1039 &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this, 1040 grpc_combiner_scheduler(args.combiner)); 1041 GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_, 1042 &GrpcLb::OnRoundRobinConnectivityChangedLocked, this, 1043 grpc_combiner_scheduler(args.combiner)); 1044 GRPC_CLOSURE_INIT(&on_rr_request_reresolution_, 1045 &GrpcLb::OnRoundRobinRequestReresolutionLocked, this, 1046 grpc_combiner_scheduler(args.combiner)); 1047 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb"); 1048 // Record server name. 1049 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); 1050 const char* server_uri = grpc_channel_arg_get_string(arg); 1051 GPR_ASSERT(server_uri != nullptr); 1052 grpc_uri* uri = grpc_uri_parse(server_uri, true); 1053 GPR_ASSERT(uri->path[0] != '\0'); 1054 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); 1055 if (grpc_lb_glb_trace.enabled()) { 1056 gpr_log(GPR_INFO, 1057 "[grpclb %p] Will use '%s' as the server name for LB request.", 1058 this, server_name_); 1059 } 1060 grpc_uri_destroy(uri); 1061 // Record LB call timeout. 1062 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); 1063 lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); 1064 // Record fallback timeout. 1065 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); 1066 lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( 1067 arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); 1068 // Process channel args. 1069 ProcessChannelArgsLocked(*args.args); 1070 } 1071 1072 GrpcLb::~GrpcLb() { 1073 GPR_ASSERT(pending_picks_ == nullptr); 1074 gpr_mu_destroy(&lb_channel_mu_); 1075 gpr_free((void*)server_name_); 1076 grpc_channel_args_destroy(args_); 1077 grpc_connectivity_state_destroy(&state_tracker_); 1078 if (serverlist_ != nullptr) { 1079 grpc_grpclb_destroy_serverlist(serverlist_); 1080 } 1081 if (fallback_backend_addresses_ != nullptr) { 1082 grpc_lb_addresses_destroy(fallback_backend_addresses_); 1083 } 1084 grpc_subchannel_index_unref(); 1085 } 1086 1087 void GrpcLb::ShutdownLocked() { 1088 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); 1089 shutting_down_ = true; 1090 lb_calld_.reset(); 1091 if (retry_timer_callback_pending_) { 1092 grpc_timer_cancel(&lb_call_retry_timer_); 1093 } 1094 if (fallback_timer_callback_pending_) { 1095 grpc_timer_cancel(&lb_fallback_timer_); 1096 } 1097 rr_policy_.reset(); 1098 TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); 1099 // We destroy the LB channel here instead of in our destructor because 1100 // destroying the channel triggers a last callback to 1101 // OnBalancerChannelConnectivityChangedLocked(), and we need to be 1102 // alive when that callback is invoked. 1103 if (lb_channel_ != nullptr) { 1104 gpr_mu_lock(&lb_channel_mu_); 1105 grpc_channel_destroy(lb_channel_); 1106 lb_channel_ = nullptr; 1107 gpr_mu_unlock(&lb_channel_mu_); 1108 } 1109 grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, 1110 GRPC_ERROR_REF(error), "grpclb_shutdown"); 1111 // Clear pending picks. 1112 PendingPick* pp; 1113 while ((pp = pending_picks_) != nullptr) { 1114 pending_picks_ = pp->next; 1115 pp->pick->connected_subchannel.reset(); 1116 // Note: pp is deleted in this callback. 1117 GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); 1118 } 1119 GRPC_ERROR_UNREF(error); 1120 } 1121 1122 // 1123 // public methods 1124 // 1125 1126 void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { 1127 PendingPick* pp; 1128 while ((pp = pending_picks_) != nullptr) { 1129 pending_picks_ = pp->next; 1130 pp->pick->on_complete = pp->original_on_complete; 1131 pp->pick->user_data = nullptr; 1132 grpc_error* error = GRPC_ERROR_NONE; 1133 if (new_policy->PickLocked(pp->pick, &error)) { 1134 // Synchronous return; schedule closure. 1135 GRPC_CLOSURE_SCHED(pp->pick->on_complete, error); 1136 } 1137 Delete(pp); 1138 } 1139 } 1140 1141 // Cancel a specific pending pick. 1142 // 1143 // A grpclb pick progresses as follows: 1144 // - If there's a Round Robin policy (rr_policy_) available, it'll be 1145 // handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From 1146 // that point onwards, it'll be RR's responsibility. For cancellations, that 1147 // implies the pick needs also be cancelled by the RR instance. 1148 // - Otherwise, without an RR instance, picks stay pending at this policy's 1149 // level (grpclb), inside the pending_picks_ list. To cancel these, 1150 // we invoke the completion closure and set the pick's connected 1151 // subchannel to nullptr right here. 1152 void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) { 1153 PendingPick* pp = pending_picks_; 1154 pending_picks_ = nullptr; 1155 while (pp != nullptr) { 1156 PendingPick* next = pp->next; 1157 if (pp->pick == pick) { 1158 pick->connected_subchannel.reset(); 1159 // Note: pp is deleted in this callback. 1160 GRPC_CLOSURE_SCHED(&pp->on_complete, 1161 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 1162 "Pick Cancelled", &error, 1)); 1163 } else { 1164 pp->next = pending_picks_; 1165 pending_picks_ = pp; 1166 } 1167 pp = next; 1168 } 1169 if (rr_policy_ != nullptr) { 1170 rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error)); 1171 } 1172 GRPC_ERROR_UNREF(error); 1173 } 1174 1175 // Cancel all pending picks. 1176 // 1177 // A grpclb pick progresses as follows: 1178 // - If there's a Round Robin policy (rr_policy_) available, it'll be 1179 // handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From 1180 // that point onwards, it'll be RR's responsibility. For cancellations, that 1181 // implies the pick needs also be cancelled by the RR instance. 1182 // - Otherwise, without an RR instance, picks stay pending at this policy's 1183 // level (grpclb), inside the pending_picks_ list. To cancel these, 1184 // we invoke the completion closure and set the pick's connected 1185 // subchannel to nullptr right here. 1186 void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, 1187 uint32_t initial_metadata_flags_eq, 1188 grpc_error* error) { 1189 PendingPick* pp = pending_picks_; 1190 pending_picks_ = nullptr; 1191 while (pp != nullptr) { 1192 PendingPick* next = pp->next; 1193 if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == 1194 initial_metadata_flags_eq) { 1195 // Note: pp is deleted in this callback. 1196 GRPC_CLOSURE_SCHED(&pp->on_complete, 1197 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 1198 "Pick Cancelled", &error, 1)); 1199 } else { 1200 pp->next = pending_picks_; 1201 pending_picks_ = pp; 1202 } 1203 pp = next; 1204 } 1205 if (rr_policy_ != nullptr) { 1206 rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask, 1207 initial_metadata_flags_eq, 1208 GRPC_ERROR_REF(error)); 1209 } 1210 GRPC_ERROR_UNREF(error); 1211 } 1212 1213 void GrpcLb::ExitIdleLocked() { 1214 if (!started_picking_) { 1215 StartPickingLocked(); 1216 } 1217 } 1218 1219 void GrpcLb::ResetBackoffLocked() { 1220 if (lb_channel_ != nullptr) { 1221 grpc_channel_reset_connect_backoff(lb_channel_); 1222 } 1223 if (rr_policy_ != nullptr) { 1224 rr_policy_->ResetBackoffLocked(); 1225 } 1226 } 1227 1228 bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) { 1229 PendingPick* pp = PendingPickCreate(pick); 1230 bool pick_done = false; 1231 if (rr_policy_ != nullptr) { 1232 if (grpc_lb_glb_trace.enabled()) { 1233 gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this, 1234 rr_policy_.get()); 1235 } 1236 pick_done = 1237 PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error); 1238 } else { // rr_policy_ == NULL 1239 if (pick->on_complete == nullptr) { 1240 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( 1241 "No pick result available but synchronous result required."); 1242 pick_done = true; 1243 } else { 1244 if (grpc_lb_glb_trace.enabled()) { 1245 gpr_log(GPR_INFO, 1246 "[grpclb %p] No RR policy. Adding to grpclb's pending picks", 1247 this); 1248 } 1249 AddPendingPick(pp); 1250 if (!started_picking_) { 1251 StartPickingLocked(); 1252 } 1253 pick_done = false; 1254 } 1255 } 1256 return pick_done; 1257 } 1258 1259 void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, 1260 ChildRefsList* child_channels) { 1261 // delegate to the RoundRobin to fill the children subchannels. 1262 rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); 1263 MutexLock lock(&lb_channel_mu_); 1264 if (lb_channel_ != nullptr) { 1265 grpc_core::channelz::ChannelNode* channel_node = 1266 grpc_channel_get_channelz_node(lb_channel_); 1267 if (channel_node != nullptr) { 1268 child_channels->push_back(channel_node->uuid()); 1269 } 1270 } 1271 } 1272 1273 grpc_connectivity_state GrpcLb::CheckConnectivityLocked( 1274 grpc_error** connectivity_error) { 1275 return grpc_connectivity_state_get(&state_tracker_, connectivity_error); 1276 } 1277 1278 void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, 1279 grpc_closure* notify) { 1280 grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, 1281 notify); 1282 } 1283 1284 void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { 1285 const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); 1286 if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { 1287 // Ignore this update. 1288 gpr_log( 1289 GPR_ERROR, 1290 "[grpclb %p] No valid LB addresses channel arg in update, ignoring.", 1291 this); 1292 return; 1293 } 1294 const grpc_lb_addresses* addresses = 1295 static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); 1296 // Update fallback address list. 1297 if (fallback_backend_addresses_ != nullptr) { 1298 grpc_lb_addresses_destroy(fallback_backend_addresses_); 1299 } 1300 fallback_backend_addresses_ = ExtractBackendAddresses(addresses); 1301 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, 1302 // since we use this to trigger the client_load_reporting filter. 1303 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; 1304 grpc_arg new_arg = grpc_channel_arg_string_create( 1305 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb"); 1306 grpc_channel_args_destroy(args_); 1307 args_ = grpc_channel_args_copy_and_add_and_remove( 1308 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); 1309 // Construct args for balancer channel. 1310 grpc_channel_args* lb_channel_args = 1311 BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); 1312 // Create balancer channel if needed. 1313 if (lb_channel_ == nullptr) { 1314 char* uri_str; 1315 gpr_asprintf(&uri_str, "fake:///%s", server_name_); 1316 gpr_mu_lock(&lb_channel_mu_); 1317 lb_channel_ = grpc_client_channel_factory_create_channel( 1318 client_channel_factory(), uri_str, 1319 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args); 1320 gpr_mu_unlock(&lb_channel_mu_); 1321 GPR_ASSERT(lb_channel_ != nullptr); 1322 gpr_free(uri_str); 1323 } 1324 // Propagate updates to the LB channel (pick_first) through the fake 1325 // resolver. 1326 response_generator_->SetResponse(lb_channel_args); 1327 grpc_channel_args_destroy(lb_channel_args); 1328 } 1329 1330 void GrpcLb::UpdateLocked(const grpc_channel_args& args) { 1331 ProcessChannelArgsLocked(args); 1332 // If fallback is configured and the RR policy already exists, update 1333 // it with the new fallback addresses. 1334 if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) { 1335 CreateOrUpdateRoundRobinPolicyLocked(); 1336 } 1337 // Start watching the LB channel connectivity for connection, if not 1338 // already doing so. 1339 if (!watching_lb_channel_) { 1340 lb_channel_connectivity_ = grpc_channel_check_connectivity_state( 1341 lb_channel_, true /* try to connect */); 1342 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( 1343 grpc_channel_get_channel_stack(lb_channel_)); 1344 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); 1345 watching_lb_channel_ = true; 1346 // TODO(roth): We currently track this ref manually. Once the 1347 // ClosureRef API is ready, we should pass the RefCountedPtr<> along 1348 // with the callback. 1349 auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); 1350 self.release(); 1351 grpc_client_channel_watch_connectivity_state( 1352 client_channel_elem, 1353 grpc_polling_entity_create_from_pollset_set(interested_parties()), 1354 &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, 1355 nullptr); 1356 } 1357 } 1358 1359 // 1360 // code for balancer channel and call 1361 // 1362 1363 void GrpcLb::StartPickingLocked() { 1364 // Start a timer to fall back. 1365 if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && 1366 !fallback_timer_callback_pending_) { 1367 grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; 1368 // TODO(roth): We currently track this ref manually. Once the 1369 // ClosureRef API is ready, we should pass the RefCountedPtr<> along 1370 // with the callback. 1371 auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); 1372 self.release(); 1373 GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this, 1374 grpc_combiner_scheduler(combiner())); 1375 fallback_timer_callback_pending_ = true; 1376 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); 1377 } 1378 started_picking_ = true; 1379 StartBalancerCallLocked(); 1380 } 1381 1382 void GrpcLb::StartBalancerCallLocked() { 1383 GPR_ASSERT(lb_channel_ != nullptr); 1384 if (shutting_down_) return; 1385 // Init the LB call data. 1386 GPR_ASSERT(lb_calld_ == nullptr); 1387 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); 1388 if (grpc_lb_glb_trace.enabled()) { 1389 gpr_log(GPR_INFO, 1390 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)", 1391 this, lb_channel_, lb_calld_.get()); 1392 } 1393 lb_calld_->StartQuery(); 1394 } 1395 1396 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { 1397 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); 1398 grpclb_policy->fallback_timer_callback_pending_ = false; 1399 // If we receive a serverlist after the timer fires but before this callback 1400 // actually runs, don't fall back. 1401 if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ && 1402 error == GRPC_ERROR_NONE) { 1403 if (grpc_lb_glb_trace.enabled()) { 1404 gpr_log(GPR_INFO, 1405 "[grpclb %p] Falling back to use backends from resolver", 1406 grpclb_policy); 1407 } 1408 GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr); 1409 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked(); 1410 } 1411 grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); 1412 } 1413 1414 void GrpcLb::StartBalancerCallRetryTimerLocked() { 1415 grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); 1416 if (grpc_lb_glb_trace.enabled()) { 1417 gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this); 1418 grpc_millis timeout = next_try - ExecCtx::Get()->Now(); 1419 if (timeout > 0) { 1420 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.", 1421 this, timeout); 1422 } else { 1423 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.", 1424 this); 1425 } 1426 } 1427 // TODO(roth): We currently track this ref manually. Once the 1428 // ClosureRef API is ready, we should pass the RefCountedPtr<> along 1429 // with the callback. 1430 auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); 1431 self.release(); 1432 GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked, 1433 this, grpc_combiner_scheduler(combiner())); 1434 retry_timer_callback_pending_ = true; 1435 grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); 1436 } 1437 1438 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { 1439 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); 1440 grpclb_policy->retry_timer_callback_pending_ = false; 1441 if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE && 1442 grpclb_policy->lb_calld_ == nullptr) { 1443 if (grpc_lb_glb_trace.enabled()) { 1444 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", 1445 grpclb_policy); 1446 } 1447 grpclb_policy->StartBalancerCallLocked(); 1448 } 1449 grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); 1450 } 1451 1452 // Invoked as part of the update process. It continues watching the LB channel 1453 // until it shuts down or becomes READY. It's invoked even if the LB channel 1454 // stayed READY throughout the update (for example if the update is identical). 1455 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, 1456 grpc_error* error) { 1457 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); 1458 if (grpclb_policy->shutting_down_) goto done; 1459 // Re-initialize the lb_call. This should also take care of updating the 1460 // embedded RR policy. Note that the current RR policy, if any, will stay in 1461 // effect until an update from the new lb_call is received. 1462 switch (grpclb_policy->lb_channel_connectivity_) { 1463 case GRPC_CHANNEL_CONNECTING: 1464 case GRPC_CHANNEL_TRANSIENT_FAILURE: { 1465 // Keep watching the LB channel. 1466 grpc_channel_element* client_channel_elem = 1467 grpc_channel_stack_last_element( 1468 grpc_channel_get_channel_stack(grpclb_policy->lb_channel_)); 1469 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); 1470 grpc_client_channel_watch_connectivity_state( 1471 client_channel_elem, 1472 grpc_polling_entity_create_from_pollset_set( 1473 grpclb_policy->interested_parties()), 1474 &grpclb_policy->lb_channel_connectivity_, 1475 &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr); 1476 break; 1477 } 1478 // The LB channel may be IDLE because it's shut down before the update. 1479 // Restart the LB call to kick the LB channel into gear. 1480 case GRPC_CHANNEL_IDLE: 1481 case GRPC_CHANNEL_READY: 1482 grpclb_policy->lb_calld_.reset(); 1483 if (grpclb_policy->started_picking_) { 1484 if (grpclb_policy->retry_timer_callback_pending_) { 1485 grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_); 1486 } 1487 grpclb_policy->lb_call_backoff_.Reset(); 1488 grpclb_policy->StartBalancerCallLocked(); 1489 } 1490 // Fall through. 1491 case GRPC_CHANNEL_SHUTDOWN: 1492 done: 1493 grpclb_policy->watching_lb_channel_ = false; 1494 grpclb_policy->Unref(DEBUG_LOCATION, 1495 "watch_lb_channel_connectivity_cb_shutdown"); 1496 } 1497 } 1498 1499 // 1500 // PendingPick 1501 // 1502 1503 // Adds lb_token of selected subchannel (address) to the call's initial 1504 // metadata. 1505 grpc_error* AddLbTokenToInitialMetadata( 1506 grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, 1507 grpc_metadata_batch* initial_metadata) { 1508 GPR_ASSERT(lb_token_mdelem_storage != nullptr); 1509 GPR_ASSERT(!GRPC_MDISNULL(lb_token)); 1510 return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, 1511 lb_token); 1512 } 1513 1514 // Destroy function used when embedding client stats in call context. 1515 void DestroyClientStats(void* arg) { 1516 static_cast<GrpcLbClientStats*>(arg)->Unref(); 1517 } 1518 1519 void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) { 1520 /* if connected_subchannel is nullptr, no pick has been made by the RR 1521 * policy (e.g., all addresses failed to connect). There won't be any 1522 * user_data/token available */ 1523 if (pp->pick->connected_subchannel != nullptr) { 1524 if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { 1525 AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), 1526 &pp->pick->lb_token_mdelem_storage, 1527 pp->pick->initial_metadata); 1528 } else { 1529 gpr_log(GPR_ERROR, 1530 "[grpclb %p] No LB token for connected subchannel pick %p", 1531 pp->grpclb_policy, pp->pick); 1532 abort(); 1533 } 1534 // Pass on client stats via context. Passes ownership of the reference. 1535 if (pp->client_stats != nullptr) { 1536 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = 1537 pp->client_stats.release(); 1538 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = 1539 DestroyClientStats; 1540 } 1541 } else { 1542 pp->client_stats.reset(); 1543 } 1544 } 1545 1546 /* The \a on_complete closure passed as part of the pick requires keeping a 1547 * reference to its associated round robin instance. We wrap this closure in 1548 * order to unref the round robin instance upon its invocation */ 1549 void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) { 1550 PendingPick* pp = static_cast<PendingPick*>(arg); 1551 PendingPickSetMetadataAndContext(pp); 1552 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); 1553 Delete(pp); 1554 } 1555 1556 GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) { 1557 PendingPick* pp = New<PendingPick>(); 1558 pp->grpclb_policy = this; 1559 pp->pick = pick; 1560 GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp, 1561 grpc_schedule_on_exec_ctx); 1562 pp->original_on_complete = pick->on_complete; 1563 pick->on_complete = &pp->on_complete; 1564 return pp; 1565 } 1566 1567 void GrpcLb::AddPendingPick(PendingPick* pp) { 1568 pp->next = pending_picks_; 1569 pending_picks_ = pp; 1570 } 1571 1572 // 1573 // code for interacting with the RR policy 1574 // 1575 1576 // Performs a pick over \a rr_policy_. Given that a pick can return 1577 // immediately (ignoring its completion callback), we need to perform the 1578 // cleanups this callback would otherwise be responsible for. 1579 // If \a force_async is true, then we will manually schedule the 1580 // completion callback even if the pick is available immediately. 1581 bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, 1582 grpc_error** error) { 1583 // Check for drops if we are not using fallback backend addresses. 1584 if (serverlist_ != nullptr) { 1585 // Look at the index into the serverlist to see if we should drop this call. 1586 grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++]; 1587 if (serverlist_index_ == serverlist_->num_servers) { 1588 serverlist_index_ = 0; // Wrap-around. 1589 } 1590 if (server->drop) { 1591 // Update client load reporting stats to indicate the number of 1592 // dropped calls. Note that we have to do this here instead of in 1593 // the client_load_reporting filter, because we do not create a 1594 // subchannel call (and therefore no client_load_reporting filter) 1595 // for dropped calls. 1596 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { 1597 lb_calld_->client_stats()->AddCallDroppedLocked( 1598 server->load_balance_token); 1599 } 1600 if (force_async) { 1601 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); 1602 Delete(pp); 1603 return false; 1604 } 1605 Delete(pp); 1606 return true; 1607 } 1608 } 1609 // Set client_stats and user_data. 1610 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { 1611 pp->client_stats = lb_calld_->client_stats()->Ref(); 1612 } 1613 GPR_ASSERT(pp->pick->user_data == nullptr); 1614 pp->pick->user_data = (void**)&pp->lb_token; 1615 // Pick via the RR policy. 1616 bool pick_done = rr_policy_->PickLocked(pp->pick, error); 1617 if (pick_done) { 1618 PendingPickSetMetadataAndContext(pp); 1619 if (force_async) { 1620 GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); 1621 *error = GRPC_ERROR_NONE; 1622 pick_done = false; 1623 } 1624 Delete(pp); 1625 } 1626 // else, the pending pick will be registered and taken care of by the 1627 // pending pick list inside the RR policy. Eventually, 1628 // OnPendingPickComplete() will be called, which will (among other 1629 // things) add the LB token to the call's initial metadata. 1630 return pick_done; 1631 } 1632 1633 void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) { 1634 GPR_ASSERT(rr_policy_ == nullptr); 1635 rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( 1636 "round_robin", args); 1637 if (GPR_UNLIKELY(rr_policy_ == nullptr)) { 1638 gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy", 1639 this); 1640 return; 1641 } 1642 // TODO(roth): We currently track this ref manually. Once the new 1643 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. 1644 auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); 1645 self.release(); 1646 rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_); 1647 grpc_error* rr_state_error = nullptr; 1648 rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error); 1649 // Connectivity state is a function of the RR policy updated/created. 1650 UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error); 1651 // Add the gRPC LB's interested_parties pollset_set to that of the newly 1652 // created RR policy. This will make the RR policy progress upon activity on 1653 // gRPC LB, which in turn is tied to the application's call. 1654 grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), 1655 interested_parties()); 1656 // Subscribe to changes to the connectivity of the new RR. 1657 // TODO(roth): We currently track this ref manually. Once the new 1658 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. 1659 self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed"); 1660 self.release(); 1661 rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_, 1662 &on_rr_connectivity_changed_); 1663 rr_policy_->ExitIdleLocked(); 1664 // Send pending picks to RR policy. 1665 PendingPick* pp; 1666 while ((pp = pending_picks_)) { 1667 pending_picks_ = pp->next; 1668 if (grpc_lb_glb_trace.enabled()) { 1669 gpr_log(GPR_INFO, 1670 "[grpclb %p] Pending pick about to (async) PICK from RR %p", this, 1671 rr_policy_.get()); 1672 } 1673 grpc_error* error = GRPC_ERROR_NONE; 1674 PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error); 1675 } 1676 } 1677 1678 grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() { 1679 grpc_lb_addresses* addresses; 1680 bool is_backend_from_grpclb_load_balancer = false; 1681 if (serverlist_ != nullptr) { 1682 GPR_ASSERT(serverlist_->num_servers > 0); 1683 addresses = ProcessServerlist(serverlist_); 1684 is_backend_from_grpclb_load_balancer = true; 1685 } else { 1686 // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't 1687 // received any serverlist from the balancer, we use the fallback backends 1688 // returned by the resolver. Note that the fallback backend list may be 1689 // empty, in which case the new round_robin policy will keep the requested 1690 // picks pending. 1691 GPR_ASSERT(fallback_backend_addresses_ != nullptr); 1692 addresses = grpc_lb_addresses_copy(fallback_backend_addresses_); 1693 } 1694 GPR_ASSERT(addresses != nullptr); 1695 // Replace the LB addresses in the channel args that we pass down to 1696 // the subchannel. 1697 static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; 1698 const grpc_arg args_to_add[] = { 1699 grpc_lb_addresses_create_channel_arg(addresses), 1700 // A channel arg indicating if the target is a backend inferred from a 1701 // grpclb load balancer. 1702 grpc_channel_arg_integer_create( 1703 const_cast<char*>( 1704 GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER), 1705 is_backend_from_grpclb_load_balancer), 1706 }; 1707 grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( 1708 args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, 1709 GPR_ARRAY_SIZE(args_to_add)); 1710 grpc_lb_addresses_destroy(addresses); 1711 return args; 1712 } 1713 1714 void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() { 1715 if (shutting_down_) return; 1716 grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); 1717 GPR_ASSERT(args != nullptr); 1718 if (rr_policy_ != nullptr) { 1719 if (grpc_lb_glb_trace.enabled()) { 1720 gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this, 1721 rr_policy_.get()); 1722 } 1723 rr_policy_->UpdateLocked(*args); 1724 } else { 1725 LoadBalancingPolicy::Args lb_policy_args; 1726 lb_policy_args.combiner = combiner(); 1727 lb_policy_args.client_channel_factory = client_channel_factory(); 1728 lb_policy_args.args = args; 1729 CreateRoundRobinPolicyLocked(lb_policy_args); 1730 if (grpc_lb_glb_trace.enabled()) { 1731 gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this, 1732 rr_policy_.get()); 1733 } 1734 } 1735 grpc_channel_args_destroy(args); 1736 } 1737 1738 void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg, 1739 grpc_error* error) { 1740 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); 1741 if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) { 1742 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested"); 1743 return; 1744 } 1745 if (grpc_lb_glb_trace.enabled()) { 1746 gpr_log( 1747 GPR_INFO, 1748 "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", 1749 grpclb_policy, grpclb_policy->rr_policy_.get()); 1750 } 1751 // If we are talking to a balancer, we expect to get updated addresses form 1752 // the balancer, so we can ignore the re-resolution request from the RR 1753 // policy. Otherwise, handle the re-resolution request using the 1754 // grpclb policy's original re-resolution closure. 1755 if (grpclb_policy->lb_calld_ == nullptr || 1756 !grpclb_policy->lb_calld_->seen_initial_response()) { 1757 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE); 1758 } 1759 // Give back the wrapper closure to the RR policy. 1760 grpclb_policy->rr_policy_->SetReresolutionClosureLocked( 1761 &grpclb_policy->on_rr_request_reresolution_); 1762 } 1763 1764 void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked( 1765 grpc_error* rr_state_error) { 1766 const grpc_connectivity_state curr_glb_state = 1767 grpc_connectivity_state_check(&state_tracker_); 1768 /* The new connectivity status is a function of the previous one and the new 1769 * input coming from the status of the RR policy. 1770 * 1771 * current state (grpclb's) 1772 * | 1773 * v || I | C | R | TF | SD | <- new state (RR's) 1774 * ===++====+=====+=====+======+======+ 1775 * I || I | C | R | [I] | [I] | 1776 * ---++----+-----+-----+------+------+ 1777 * C || I | C | R | [C] | [C] | 1778 * ---++----+-----+-----+------+------+ 1779 * R || I | C | R | [R] | [R] | 1780 * ---++----+-----+-----+------+------+ 1781 * TF || I | C | R | [TF] | [TF] | 1782 * ---++----+-----+-----+------+------+ 1783 * SD || NA | NA | NA | NA | NA | (*) 1784 * ---++----+-----+-----+------+------+ 1785 * 1786 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE 1787 * is the current state of grpclb, which is left untouched. 1788 * 1789 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to 1790 * the previous RR instance. 1791 * 1792 * Note that the status is never updated to SHUTDOWN as a result of calling 1793 * this function. Only glb_shutdown() has the power to set that state. 1794 * 1795 * (*) This function mustn't be called during shutting down. */ 1796 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); 1797 switch (rr_connectivity_state_) { 1798 case GRPC_CHANNEL_TRANSIENT_FAILURE: 1799 case GRPC_CHANNEL_SHUTDOWN: 1800 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); 1801 break; 1802 case GRPC_CHANNEL_IDLE: 1803 case GRPC_CHANNEL_CONNECTING: 1804 case GRPC_CHANNEL_READY: 1805 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); 1806 } 1807 if (grpc_lb_glb_trace.enabled()) { 1808 gpr_log( 1809 GPR_INFO, 1810 "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", 1811 this, grpc_connectivity_state_name(rr_connectivity_state_), 1812 rr_policy_.get()); 1813 } 1814 grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_, 1815 rr_state_error, 1816 "update_lb_connectivity_status_locked"); 1817 } 1818 1819 void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg, 1820 grpc_error* error) { 1821 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); 1822 if (grpclb_policy->shutting_down_) { 1823 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed"); 1824 return; 1825 } 1826 grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked( 1827 GRPC_ERROR_REF(error)); 1828 // Resubscribe. Reuse the "on_rr_connectivity_changed" ref. 1829 grpclb_policy->rr_policy_->NotifyOnStateChangeLocked( 1830 &grpclb_policy->rr_connectivity_state_, 1831 &grpclb_policy->on_rr_connectivity_changed_); 1832 } 1833 1834 // 1835 // factory 1836 // 1837 1838 class GrpcLbFactory : public LoadBalancingPolicyFactory { 1839 public: 1840 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( 1841 const LoadBalancingPolicy::Args& args) const override { 1842 /* Count the number of gRPC-LB addresses. There must be at least one. */ 1843 const grpc_arg* arg = 1844 grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); 1845 if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { 1846 return nullptr; 1847 } 1848 grpc_lb_addresses* addresses = 1849 static_cast<grpc_lb_addresses*>(arg->value.pointer.p); 1850 size_t num_grpclb_addrs = 0; 1851 for (size_t i = 0; i < addresses->num_addresses; ++i) { 1852 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; 1853 } 1854 if (num_grpclb_addrs == 0) return nullptr; 1855 return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args)); 1856 } 1857 1858 const char* name() const override { return "grpclb"; } 1859 }; 1860 1861 } // namespace 1862 1863 } // namespace grpc_core 1864 1865 // 1866 // Plugin registration 1867 // 1868 1869 namespace { 1870 1871 // Only add client_load_reporting filter if the grpclb LB policy is used. 1872 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder, 1873 void* arg) { 1874 const grpc_channel_args* args = 1875 grpc_channel_stack_builder_get_channel_arguments(builder); 1876 const grpc_arg* channel_arg = 1877 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); 1878 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING && 1879 strcmp(channel_arg->value.string, "grpclb") == 0) { 1880 return grpc_channel_stack_builder_append_filter( 1881 builder, (const grpc_channel_filter*)arg, nullptr, nullptr); 1882 } 1883 return true; 1884 } 1885 1886 } // namespace 1887 1888 void grpc_lb_policy_grpclb_init() { 1889 grpc_core::LoadBalancingPolicyRegistry::Builder:: 1890 RegisterLoadBalancingPolicyFactory( 1891 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( 1892 grpc_core::New<grpc_core::GrpcLbFactory>())); 1893 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, 1894 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, 1895 maybe_add_client_load_reporting_filter, 1896 (void*)&grpc_client_load_reporting_filter); 1897 } 1898 1899 void grpc_lb_policy_grpclb_shutdown() {} 1900