Home | History | Annotate | Download | only in grpclb
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 /// Implementation of the gRPC LB policy.
     20 ///
     21 /// This policy takes as input a list of resolved addresses, which must
     22 /// include at least one balancer address.
     23 ///
     24 /// An internal channel (\a lb_channel_) is created for the addresses
     25 /// from that are balancers.  This channel behaves just like a regular
     26 /// channel that uses pick_first to select from the list of balancer
     27 /// addresses.
     28 ///
     29 /// The first time the policy gets a request for a pick, a ping, or to exit
     30 /// the idle state, \a StartPickingLocked() is called. This method is
     31 /// responsible for instantiating the internal *streaming* call to the LB
     32 /// server (whichever address pick_first chose).  The call will be complete
     33 /// when either the balancer sends status or when we cancel the call (e.g.,
     34 /// because we are shutting down).  In needed, we retry the call.  If we
     35 /// received at least one valid message from the server, a new call attempt
     36 /// will be made immediately; otherwise, we apply back-off delays between
     37 /// attempts.
     38 ///
     39 /// We maintain an internal round_robin policy instance for distributing
     40 /// requests across backends.  Whenever we receive a new serverlist from
     41 /// the balancer, we update the round_robin policy with the new list of
     42 /// addresses.  If we cannot communicate with the balancer on startup,
     43 /// however, we may enter fallback mode, in which case we will populate
     44 /// the RR policy's addresses from the backend addresses returned by the
     45 /// resolver.
     46 ///
     47 /// Once an RR policy instance is in place (and getting updated as described),
     48 /// calls for a pick, a ping, or a cancellation will be serviced right
     49 /// away by forwarding them to the RR instance.  Any time there's no RR
     50 /// policy available (i.e., right after the creation of the gRPCLB policy),
     51 /// pick and ping requests are added to a list of pending picks and pings
     52 /// to be flushed and serviced when the RR policy instance becomes available.
     53 ///
     54 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
     55 /// high level design and details.
     56 
     57 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
     58 // using that endpoint. Because of various transitive includes in uv.h,
     59 // including windows.h on Windows, uv.h must be included before other system
     60 // headers. Therefore, sockaddr.h must always be included first.
     61 #include <grpc/support/port_platform.h>
     62 
     63 #include "src/core/lib/iomgr/sockaddr.h"
     64 #include "src/core/lib/iomgr/socket_utils.h"
     65 
     66 #include <inttypes.h>
     67 #include <limits.h>
     68 #include <string.h>
     69 
     70 #include <grpc/byte_buffer_reader.h>
     71 #include <grpc/grpc.h>
     72 #include <grpc/support/alloc.h>
     73 #include <grpc/support/string_util.h>
     74 #include <grpc/support/time.h>
     75 
     76 #include "src/core/ext/filters/client_channel/client_channel.h"
     77 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
     78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
     79 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
     80 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
     81 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
     82 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
     83 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
     84 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
     85 #include "src/core/ext/filters/client_channel/parse_address.h"
     86 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
     87 #include "src/core/ext/filters/client_channel/subchannel_index.h"
     88 #include "src/core/lib/backoff/backoff.h"
     89 #include "src/core/lib/channel/channel_args.h"
     90 #include "src/core/lib/channel/channel_stack.h"
     91 #include "src/core/lib/gpr/host_port.h"
     92 #include "src/core/lib/gpr/string.h"
     93 #include "src/core/lib/gprpp/manual_constructor.h"
     94 #include "src/core/lib/gprpp/memory.h"
     95 #include "src/core/lib/gprpp/mutex_lock.h"
     96 #include "src/core/lib/gprpp/orphanable.h"
     97 #include "src/core/lib/gprpp/ref_counted_ptr.h"
     98 #include "src/core/lib/iomgr/combiner.h"
     99 #include "src/core/lib/iomgr/sockaddr.h"
    100 #include "src/core/lib/iomgr/sockaddr_utils.h"
    101 #include "src/core/lib/iomgr/timer.h"
    102 #include "src/core/lib/slice/slice_hash_table.h"
    103 #include "src/core/lib/slice/slice_internal.h"
    104 #include "src/core/lib/slice/slice_string_helpers.h"
    105 #include "src/core/lib/surface/call.h"
    106 #include "src/core/lib/surface/channel.h"
    107 #include "src/core/lib/surface/channel_init.h"
    108 #include "src/core/lib/transport/static_metadata.h"
    109 
    110 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
    111 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
    112 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
    113 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
    114 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
    115 
    116 namespace grpc_core {
    117 
    118 TraceFlag grpc_lb_glb_trace(false, "glb");
    119 
    120 namespace {
    121 
    122 class GrpcLb : public LoadBalancingPolicy {
    123  public:
    124   GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
    125 
    126   void UpdateLocked(const grpc_channel_args& args) override;
    127   bool PickLocked(PickState* pick, grpc_error** error) override;
    128   void CancelPickLocked(PickState* pick, grpc_error* error) override;
    129   void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
    130                                  uint32_t initial_metadata_flags_eq,
    131                                  grpc_error* error) override;
    132   void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
    133                                  grpc_closure* closure) override;
    134   grpc_connectivity_state CheckConnectivityLocked(
    135       grpc_error** connectivity_error) override;
    136   void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
    137   void ExitIdleLocked() override;
    138   void ResetBackoffLocked() override;
    139   void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
    140                                 ChildRefsList* child_channels) override;
    141 
    142  private:
    143   /// Linked list of pending pick requests. It stores all information needed to
    144   /// eventually call (Round Robin's) pick() on them. They mainly stay pending
    145   /// waiting for the RR policy to be created.
    146   ///
    147   /// Note that when a pick is sent to the RR policy, we inject our own
    148   /// on_complete callback, so that we can intercept the result before
    149   /// invoking the original on_complete callback.  This allows us to set the
    150   /// LB token metadata and add client_stats to the call context.
    151   /// See \a pending_pick_complete() for details.
    152   struct PendingPick {
    153     // The grpclb instance that created the wrapping. This instance is not
    154     // owned; reference counts are untouched. It's used only for logging
    155     // purposes.
    156     GrpcLb* grpclb_policy;
    157     // The original pick.
    158     PickState* pick;
    159     // Our on_complete closure and the original one.
    160     grpc_closure on_complete;
    161     grpc_closure* original_on_complete;
    162     // The LB token associated with the pick.  This is set via user_data in
    163     // the pick.
    164     grpc_mdelem lb_token;
    165     // Stats for client-side load reporting.
    166     RefCountedPtr<GrpcLbClientStats> client_stats;
    167     // Next pending pick.
    168     PendingPick* next = nullptr;
    169   };
    170 
    171   /// Contains a call to the LB server and all the data related to the call.
    172   class BalancerCallState
    173       : public InternallyRefCountedWithTracing<BalancerCallState> {
    174    public:
    175     explicit BalancerCallState(
    176         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
    177 
    178     // It's the caller's responsibility to ensure that Orphan() is called from
    179     // inside the combiner.
    180     void Orphan() override;
    181 
    182     void StartQuery();
    183 
    184     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
    185 
    186     bool seen_initial_response() const { return seen_initial_response_; }
    187 
    188    private:
    189     // So Delete() can access our private dtor.
    190     template <typename T>
    191     friend void grpc_core::Delete(T*);
    192 
    193     ~BalancerCallState();
    194 
    195     GrpcLb* grpclb_policy() const {
    196       return static_cast<GrpcLb*>(grpclb_policy_.get());
    197     }
    198 
    199     void ScheduleNextClientLoadReportLocked();
    200     void SendClientLoadReportLocked();
    201 
    202     static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
    203 
    204     static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
    205     static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
    206     static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
    207     static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
    208     static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
    209 
    210     // The owning LB policy.
    211     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
    212 
    213     // The streaming call to the LB server. Always non-NULL.
    214     grpc_call* lb_call_ = nullptr;
    215 
    216     // recv_initial_metadata
    217     grpc_metadata_array lb_initial_metadata_recv_;
    218 
    219     // send_message
    220     grpc_byte_buffer* send_message_payload_ = nullptr;
    221     grpc_closure lb_on_initial_request_sent_;
    222 
    223     // recv_message
    224     grpc_byte_buffer* recv_message_payload_ = nullptr;
    225     grpc_closure lb_on_balancer_message_received_;
    226     bool seen_initial_response_ = false;
    227 
    228     // recv_trailing_metadata
    229     grpc_closure lb_on_balancer_status_received_;
    230     grpc_metadata_array lb_trailing_metadata_recv_;
    231     grpc_status_code lb_call_status_;
    232     grpc_slice lb_call_status_details_;
    233 
    234     // The stats for client-side load reporting associated with this LB call.
    235     // Created after the first serverlist is received.
    236     RefCountedPtr<GrpcLbClientStats> client_stats_;
    237     grpc_millis client_stats_report_interval_ = 0;
    238     grpc_timer client_load_report_timer_;
    239     bool client_load_report_timer_callback_pending_ = false;
    240     bool last_client_load_report_counters_were_zero_ = false;
    241     bool client_load_report_is_due_ = false;
    242     // The closure used for either the load report timer or the callback for
    243     // completion of sending the load report.
    244     grpc_closure client_load_report_closure_;
    245   };
    246 
    247   ~GrpcLb();
    248 
    249   void ShutdownLocked() override;
    250 
    251   // Helper function used in ctor and UpdateLocked().
    252   void ProcessChannelArgsLocked(const grpc_channel_args& args);
    253 
    254   // Methods for dealing with the balancer channel and call.
    255   void StartPickingLocked();
    256   void StartBalancerCallLocked();
    257   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
    258   void StartBalancerCallRetryTimerLocked();
    259   static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
    260   static void OnBalancerChannelConnectivityChangedLocked(void* arg,
    261                                                          grpc_error* error);
    262 
    263   // Pending pick methods.
    264   static void PendingPickSetMetadataAndContext(PendingPick* pp);
    265   PendingPick* PendingPickCreate(PickState* pick);
    266   void AddPendingPick(PendingPick* pp);
    267   static void OnPendingPickComplete(void* arg, grpc_error* error);
    268 
    269   // Methods for dealing with the RR policy.
    270   void CreateOrUpdateRoundRobinPolicyLocked();
    271   grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
    272   void CreateRoundRobinPolicyLocked(const Args& args);
    273   bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
    274                                       grpc_error** error);
    275   void UpdateConnectivityStateFromRoundRobinPolicyLocked(
    276       grpc_error* rr_state_error);
    277   static void OnRoundRobinConnectivityChangedLocked(void* arg,
    278                                                     grpc_error* error);
    279   static void OnRoundRobinRequestReresolutionLocked(void* arg,
    280                                                     grpc_error* error);
    281 
    282   // Who the client is trying to communicate with.
    283   const char* server_name_ = nullptr;
    284 
    285   // Current channel args from the resolver.
    286   grpc_channel_args* args_ = nullptr;
    287 
    288   // Internal state.
    289   bool started_picking_ = false;
    290   bool shutting_down_ = false;
    291   grpc_connectivity_state_tracker state_tracker_;
    292 
    293   // The channel for communicating with the LB server.
    294   grpc_channel* lb_channel_ = nullptr;
    295   // Mutex to protect the channel to the LB server. This is used when
    296   // processing a channelz request.
    297   gpr_mu lb_channel_mu_;
    298   grpc_connectivity_state lb_channel_connectivity_;
    299   grpc_closure lb_channel_on_connectivity_changed_;
    300   // Are we already watching the LB channel's connectivity?
    301   bool watching_lb_channel_ = false;
    302   // Response generator to inject address updates into lb_channel_.
    303   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
    304 
    305   // The data associated with the current LB call. It holds a ref to this LB
    306   // policy. It's initialized every time we query for backends. It's reset to
    307   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
    308   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
    309   // contains a non-NULL lb_call_.
    310   OrphanablePtr<BalancerCallState> lb_calld_;
    311   // Timeout in milliseconds for the LB call. 0 means no deadline.
    312   int lb_call_timeout_ms_ = 0;
    313   // Balancer call retry state.
    314   BackOff lb_call_backoff_;
    315   bool retry_timer_callback_pending_ = false;
    316   grpc_timer lb_call_retry_timer_;
    317   grpc_closure lb_on_call_retry_;
    318 
    319   // The deserialized response from the balancer. May be nullptr until one
    320   // such response has arrived.
    321   grpc_grpclb_serverlist* serverlist_ = nullptr;
    322   // Index into serverlist for next pick.
    323   // If the server at this index is a drop, we return a drop.
    324   // Otherwise, we delegate to the RR policy.
    325   size_t serverlist_index_ = 0;
    326 
    327   // Timeout in milliseconds for before using fallback backend addresses.
    328   // 0 means not using fallback.
    329   int lb_fallback_timeout_ms_ = 0;
    330   // The backend addresses from the resolver.
    331   grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
    332   // Fallback timer.
    333   bool fallback_timer_callback_pending_ = false;
    334   grpc_timer lb_fallback_timer_;
    335   grpc_closure lb_on_fallback_;
    336 
    337   // Pending picks that are waiting on the RR policy's connectivity.
    338   PendingPick* pending_picks_ = nullptr;
    339 
    340   // The RR policy to use for the backends.
    341   OrphanablePtr<LoadBalancingPolicy> rr_policy_;
    342   grpc_connectivity_state rr_connectivity_state_;
    343   grpc_closure on_rr_connectivity_changed_;
    344   grpc_closure on_rr_request_reresolution_;
    345 };
    346 
    347 //
    348 // serverlist parsing code
    349 //
    350 
    351 // vtable for LB tokens in grpc_lb_addresses
    352 void* lb_token_copy(void* token) {
    353   return token == nullptr
    354              ? nullptr
    355              : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
    356 }
    357 void lb_token_destroy(void* token) {
    358   if (token != nullptr) {
    359     GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
    360   }
    361 }
    362 int lb_token_cmp(void* token1, void* token2) {
    363   if (token1 > token2) return 1;
    364   if (token1 < token2) return -1;
    365   return 0;
    366 }
    367 const grpc_lb_user_data_vtable lb_token_vtable = {
    368     lb_token_copy, lb_token_destroy, lb_token_cmp};
    369 
    370 // Returns the backend addresses extracted from the given addresses.
    371 grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
    372   // First pass: count the number of backend addresses.
    373   size_t num_backends = 0;
    374   for (size_t i = 0; i < addresses->num_addresses; ++i) {
    375     if (!addresses->addresses[i].is_balancer) {
    376       ++num_backends;
    377     }
    378   }
    379   // Second pass: actually populate the addresses and (empty) LB tokens.
    380   grpc_lb_addresses* backend_addresses =
    381       grpc_lb_addresses_create(num_backends, &lb_token_vtable);
    382   size_t num_copied = 0;
    383   for (size_t i = 0; i < addresses->num_addresses; ++i) {
    384     if (addresses->addresses[i].is_balancer) continue;
    385     const grpc_resolved_address* addr = &addresses->addresses[i].address;
    386     grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
    387                                   addr->len, false /* is_balancer */,
    388                                   nullptr /* balancer_name */,
    389                                   (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
    390     ++num_copied;
    391   }
    392   return backend_addresses;
    393 }
    394 
    395 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
    396   if (server->drop) return false;
    397   const grpc_grpclb_ip_address* ip = &server->ip_address;
    398   if (GPR_UNLIKELY(server->port >> 16 != 0)) {
    399     if (log) {
    400       gpr_log(GPR_ERROR,
    401               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
    402               server->port, (unsigned long)idx);
    403     }
    404     return false;
    405   }
    406   if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
    407     if (log) {
    408       gpr_log(GPR_ERROR,
    409               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
    410               "serverlist. Ignoring",
    411               ip->size, (unsigned long)idx);
    412     }
    413     return false;
    414   }
    415   return true;
    416 }
    417 
    418 void ParseServer(const grpc_grpclb_server* server,
    419                  grpc_resolved_address* addr) {
    420   memset(addr, 0, sizeof(*addr));
    421   if (server->drop) return;
    422   const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
    423   /* the addresses are given in binary format (a in(6)_addr struct) in
    424    * server->ip_address.bytes. */
    425   const grpc_grpclb_ip_address* ip = &server->ip_address;
    426   if (ip->size == 4) {
    427     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
    428     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
    429     addr4->sin_family = GRPC_AF_INET;
    430     memcpy(&addr4->sin_addr, ip->bytes, ip->size);
    431     addr4->sin_port = netorder_port;
    432   } else if (ip->size == 16) {
    433     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
    434     grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
    435     addr6->sin6_family = GRPC_AF_INET6;
    436     memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
    437     addr6->sin6_port = netorder_port;
    438   }
    439 }
    440 
    441 // Returns addresses extracted from \a serverlist.
    442 grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
    443   size_t num_valid = 0;
    444   /* first pass: count how many are valid in order to allocate the necessary
    445    * memory in a single block */
    446   for (size_t i = 0; i < serverlist->num_servers; ++i) {
    447     if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
    448   }
    449   grpc_lb_addresses* lb_addresses =
    450       grpc_lb_addresses_create(num_valid, &lb_token_vtable);
    451   /* second pass: actually populate the addresses and LB tokens (aka user data
    452    * to the outside world) to be read by the RR policy during its creation.
    453    * Given that the validity tests are very cheap, they are performed again
    454    * instead of marking the valid ones during the first pass, as this would
    455    * incurr in an allocation due to the arbitrary number of server */
    456   size_t addr_idx = 0;
    457   for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
    458     const grpc_grpclb_server* server = serverlist->servers[sl_idx];
    459     if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
    460     GPR_ASSERT(addr_idx < num_valid);
    461     /* address processing */
    462     grpc_resolved_address addr;
    463     ParseServer(server, &addr);
    464     /* lb token processing */
    465     void* user_data;
    466     if (server->has_load_balance_token) {
    467       const size_t lb_token_max_length =
    468           GPR_ARRAY_SIZE(server->load_balance_token);
    469       const size_t lb_token_length =
    470           strnlen(server->load_balance_token, lb_token_max_length);
    471       grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
    472           server->load_balance_token, lb_token_length);
    473       user_data =
    474           (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
    475               .payload;
    476     } else {
    477       char* uri = grpc_sockaddr_to_uri(&addr);
    478       gpr_log(GPR_INFO,
    479               "Missing LB token for backend address '%s'. The empty token will "
    480               "be used instead",
    481               uri);
    482       gpr_free(uri);
    483       user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
    484     }
    485     grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
    486                                   false /* is_balancer */,
    487                                   nullptr /* balancer_name */, user_data);
    488     ++addr_idx;
    489   }
    490   GPR_ASSERT(addr_idx == num_valid);
    491   return lb_addresses;
    492 }
    493 
    494 //
    495 // GrpcLb::BalancerCallState
    496 //
    497 
    498 GrpcLb::BalancerCallState::BalancerCallState(
    499     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
    500     : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
    501       grpclb_policy_(std::move(parent_grpclb_policy)) {
    502   GPR_ASSERT(grpclb_policy_ != nullptr);
    503   GPR_ASSERT(!grpclb_policy()->shutting_down_);
    504   // Init the LB call. Note that the LB call will progress every time there's
    505   // activity in grpclb_policy_->interested_parties(), which is comprised of
    506   // the polling entities from client_channel.
    507   GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
    508   GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
    509   const grpc_millis deadline =
    510       grpclb_policy()->lb_call_timeout_ms_ == 0
    511           ? GRPC_MILLIS_INF_FUTURE
    512           : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
    513   lb_call_ = grpc_channel_create_pollset_set_call(
    514       grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
    515       grpclb_policy_->interested_parties(),
    516       GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
    517       nullptr, deadline, nullptr);
    518   // Init the LB call request payload.
    519   grpc_grpclb_request* request =
    520       grpc_grpclb_request_create(grpclb_policy()->server_name_);
    521   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
    522   send_message_payload_ =
    523       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
    524   grpc_slice_unref_internal(request_payload_slice);
    525   grpc_grpclb_request_destroy(request);
    526   // Init other data associated with the LB call.
    527   grpc_metadata_array_init(&lb_initial_metadata_recv_);
    528   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
    529   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
    530                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
    531   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
    532                     OnBalancerMessageReceivedLocked, this,
    533                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
    534   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
    535                     OnBalancerStatusReceivedLocked, this,
    536                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
    537 }
    538 
    539 GrpcLb::BalancerCallState::~BalancerCallState() {
    540   GPR_ASSERT(lb_call_ != nullptr);
    541   grpc_call_unref(lb_call_);
    542   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
    543   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
    544   grpc_byte_buffer_destroy(send_message_payload_);
    545   grpc_byte_buffer_destroy(recv_message_payload_);
    546   grpc_slice_unref_internal(lb_call_status_details_);
    547 }
    548 
    549 void GrpcLb::BalancerCallState::Orphan() {
    550   GPR_ASSERT(lb_call_ != nullptr);
    551   // If we are here because grpclb_policy wants to cancel the call,
    552   // lb_on_balancer_status_received_ will complete the cancellation and clean
    553   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
    554   // call, then the following cancellation will be a no-op.
    555   grpc_call_cancel(lb_call_, nullptr);
    556   if (client_load_report_timer_callback_pending_) {
    557     grpc_timer_cancel(&client_load_report_timer_);
    558   }
    559   // Note that the initial ref is hold by lb_on_balancer_status_received_
    560   // instead of the caller of this function. So the corresponding unref happens
    561   // in lb_on_balancer_status_received_ instead of here.
    562 }
    563 
    564 void GrpcLb::BalancerCallState::StartQuery() {
    565   GPR_ASSERT(lb_call_ != nullptr);
    566   if (grpc_lb_glb_trace.enabled()) {
    567     gpr_log(GPR_INFO,
    568             "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
    569             grpclb_policy_.get(), this, lb_call_);
    570   }
    571   // Create the ops.
    572   grpc_call_error call_error;
    573   grpc_op ops[3];
    574   memset(ops, 0, sizeof(ops));
    575   // Op: send initial metadata.
    576   grpc_op* op = ops;
    577   op->op = GRPC_OP_SEND_INITIAL_METADATA;
    578   op->data.send_initial_metadata.count = 0;
    579   op->flags = 0;
    580   op->reserved = nullptr;
    581   op++;
    582   // Op: send request message.
    583   GPR_ASSERT(send_message_payload_ != nullptr);
    584   op->op = GRPC_OP_SEND_MESSAGE;
    585   op->data.send_message.send_message = send_message_payload_;
    586   op->flags = 0;
    587   op->reserved = nullptr;
    588   op++;
    589   // TODO(roth): We currently track this ref manually.  Once the
    590   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
    591   // with the callback.
    592   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
    593   self.release();
    594   call_error = grpc_call_start_batch_and_execute(
    595       lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
    596   GPR_ASSERT(GRPC_CALL_OK == call_error);
    597   // Op: recv initial metadata.
    598   op = ops;
    599   op->op = GRPC_OP_RECV_INITIAL_METADATA;
    600   op->data.recv_initial_metadata.recv_initial_metadata =
    601       &lb_initial_metadata_recv_;
    602   op->flags = 0;
    603   op->reserved = nullptr;
    604   op++;
    605   // Op: recv response.
    606   op->op = GRPC_OP_RECV_MESSAGE;
    607   op->data.recv_message.recv_message = &recv_message_payload_;
    608   op->flags = 0;
    609   op->reserved = nullptr;
    610   op++;
    611   // TODO(roth): We currently track this ref manually.  Once the
    612   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
    613   // with the callback.
    614   self = Ref(DEBUG_LOCATION, "on_message_received");
    615   self.release();
    616   call_error = grpc_call_start_batch_and_execute(
    617       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
    618   GPR_ASSERT(GRPC_CALL_OK == call_error);
    619   // Op: recv server status.
    620   op = ops;
    621   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
    622   op->data.recv_status_on_client.trailing_metadata =
    623       &lb_trailing_metadata_recv_;
    624   op->data.recv_status_on_client.status = &lb_call_status_;
    625   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
    626   op->flags = 0;
    627   op->reserved = nullptr;
    628   op++;
    629   // This callback signals the end of the LB call, so it relies on the initial
    630   // ref instead of a new ref. When it's invoked, it's the initial ref that is
    631   // unreffed.
    632   call_error = grpc_call_start_batch_and_execute(
    633       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
    634   GPR_ASSERT(GRPC_CALL_OK == call_error);
    635 };
    636 
    637 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
    638   const grpc_millis next_client_load_report_time =
    639       ExecCtx::Get()->Now() + client_stats_report_interval_;
    640   GRPC_CLOSURE_INIT(&client_load_report_closure_,
    641                     MaybeSendClientLoadReportLocked, this,
    642                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
    643   grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
    644                   &client_load_report_closure_);
    645   client_load_report_timer_callback_pending_ = true;
    646 }
    647 
    648 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
    649     void* arg, grpc_error* error) {
    650   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
    651   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
    652   lb_calld->client_load_report_timer_callback_pending_ = false;
    653   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
    654     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
    655     return;
    656   }
    657   // If we've already sent the initial request, then we can go ahead and send
    658   // the load report. Otherwise, we need to wait until the initial request has
    659   // been sent to send this (see OnInitialRequestSentLocked()).
    660   if (lb_calld->send_message_payload_ == nullptr) {
    661     lb_calld->SendClientLoadReportLocked();
    662   } else {
    663     lb_calld->client_load_report_is_due_ = true;
    664   }
    665 }
    666 
    667 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
    668     grpc_grpclb_request* request) {
    669   GrpcLbClientStats::DroppedCallCounts* drop_entries =
    670       static_cast<GrpcLbClientStats::DroppedCallCounts*>(
    671           request->client_stats.calls_finished_with_drop.arg);
    672   return request->client_stats.num_calls_started == 0 &&
    673          request->client_stats.num_calls_finished == 0 &&
    674          request->client_stats.num_calls_finished_with_client_failed_to_send ==
    675              0 &&
    676          request->client_stats.num_calls_finished_known_received == 0 &&
    677          (drop_entries == nullptr || drop_entries->size() == 0);
    678 }
    679 
    680 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
    681   // Construct message payload.
    682   GPR_ASSERT(send_message_payload_ == nullptr);
    683   grpc_grpclb_request* request =
    684       grpc_grpclb_load_report_request_create_locked(client_stats_.get());
    685   // Skip client load report if the counters were all zero in the last
    686   // report and they are still zero in this one.
    687   if (LoadReportCountersAreZero(request)) {
    688     if (last_client_load_report_counters_were_zero_) {
    689       grpc_grpclb_request_destroy(request);
    690       ScheduleNextClientLoadReportLocked();
    691       return;
    692     }
    693     last_client_load_report_counters_were_zero_ = true;
    694   } else {
    695     last_client_load_report_counters_were_zero_ = false;
    696   }
    697   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
    698   send_message_payload_ =
    699       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
    700   grpc_slice_unref_internal(request_payload_slice);
    701   grpc_grpclb_request_destroy(request);
    702   // Send the report.
    703   grpc_op op;
    704   memset(&op, 0, sizeof(op));
    705   op.op = GRPC_OP_SEND_MESSAGE;
    706   op.data.send_message.send_message = send_message_payload_;
    707   GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
    708                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
    709   grpc_call_error call_error = grpc_call_start_batch_and_execute(
    710       lb_call_, &op, 1, &client_load_report_closure_);
    711   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
    712     gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
    713             call_error);
    714     GPR_ASSERT(GRPC_CALL_OK == call_error);
    715   }
    716 }
    717 
    718 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
    719                                                            grpc_error* error) {
    720   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
    721   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
    722   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
    723   lb_calld->send_message_payload_ = nullptr;
    724   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
    725     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
    726     return;
    727   }
    728   lb_calld->ScheduleNextClientLoadReportLocked();
    729 }
    730 
    731 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
    732                                                            grpc_error* error) {
    733   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
    734   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
    735   lb_calld->send_message_payload_ = nullptr;
    736   // If we attempted to send a client load report before the initial request was
    737   // sent (and this lb_calld is still in use), send the load report now.
    738   if (lb_calld->client_load_report_is_due_ &&
    739       lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
    740     lb_calld->SendClientLoadReportLocked();
    741     lb_calld->client_load_report_is_due_ = false;
    742   }
    743   lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
    744 }
    745 
    746 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
    747     void* arg, grpc_error* error) {
    748   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
    749   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
    750   // Empty payload means the LB call was cancelled.
    751   if (lb_calld != grpclb_policy->lb_calld_.get() ||
    752       lb_calld->recv_message_payload_ == nullptr) {
    753     lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
    754     return;
    755   }
    756   grpc_byte_buffer_reader bbr;
    757   grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
    758   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
    759   grpc_byte_buffer_reader_destroy(&bbr);
    760   grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
    761   lb_calld->recv_message_payload_ = nullptr;
    762   grpc_grpclb_initial_response* initial_response;
    763   grpc_grpclb_serverlist* serverlist;
    764   if (!lb_calld->seen_initial_response_ &&
    765       (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
    766           nullptr) {
    767     // Have NOT seen initial response, look for initial response.
    768     if (initial_response->has_client_stats_report_interval) {
    769       lb_calld->client_stats_report_interval_ = GPR_MAX(
    770           GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
    771                               &initial_response->client_stats_report_interval));
    772       if (grpc_lb_glb_trace.enabled()) {
    773         gpr_log(GPR_INFO,
    774                 "[grpclb %p] Received initial LB response message; "
    775                 "client load reporting interval = %" PRId64 " milliseconds",
    776                 grpclb_policy, lb_calld->client_stats_report_interval_);
    777       }
    778     } else if (grpc_lb_glb_trace.enabled()) {
    779       gpr_log(GPR_INFO,
    780               "[grpclb %p] Received initial LB response message; client load "
    781               "reporting NOT enabled",
    782               grpclb_policy);
    783     }
    784     grpc_grpclb_initial_response_destroy(initial_response);
    785     lb_calld->seen_initial_response_ = true;
    786   } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
    787                   response_slice)) != nullptr) {
    788     // Have seen initial response, look for serverlist.
    789     GPR_ASSERT(lb_calld->lb_call_ != nullptr);
    790     if (grpc_lb_glb_trace.enabled()) {
    791       gpr_log(GPR_INFO,
    792               "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
    793               grpclb_policy, serverlist->num_servers);
    794       for (size_t i = 0; i < serverlist->num_servers; ++i) {
    795         grpc_resolved_address addr;
    796         ParseServer(serverlist->servers[i], &addr);
    797         char* ipport;
    798         grpc_sockaddr_to_string(&ipport, &addr, false);
    799         gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
    800                 grpclb_policy, i, ipport);
    801         gpr_free(ipport);
    802       }
    803     }
    804     /* update serverlist */
    805     if (serverlist->num_servers > 0) {
    806       // Start sending client load report only after we start using the
    807       // serverlist returned from the current LB call.
    808       if (lb_calld->client_stats_report_interval_ > 0 &&
    809           lb_calld->client_stats_ == nullptr) {
    810         lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
    811         // TODO(roth): We currently track this ref manually.  Once the
    812         // ClosureRef API is ready, we should pass the RefCountedPtr<> along
    813         // with the callback.
    814         auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
    815         self.release();
    816         lb_calld->ScheduleNextClientLoadReportLocked();
    817       }
    818       if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
    819                                         serverlist)) {
    820         if (grpc_lb_glb_trace.enabled()) {
    821           gpr_log(GPR_INFO,
    822                   "[grpclb %p] Incoming server list identical to current, "
    823                   "ignoring.",
    824                   grpclb_policy);
    825         }
    826         grpc_grpclb_destroy_serverlist(serverlist);
    827       } else { /* new serverlist */
    828         if (grpclb_policy->serverlist_ != nullptr) {
    829           /* dispose of the old serverlist */
    830           grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
    831         } else {
    832           /* or dispose of the fallback */
    833           grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
    834           grpclb_policy->fallback_backend_addresses_ = nullptr;
    835           if (grpclb_policy->fallback_timer_callback_pending_) {
    836             grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
    837           }
    838         }
    839         // and update the copy in the GrpcLb instance. This
    840         // serverlist instance will be destroyed either upon the next
    841         // update or when the GrpcLb instance is destroyed.
    842         grpclb_policy->serverlist_ = serverlist;
    843         grpclb_policy->serverlist_index_ = 0;
    844         grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
    845       }
    846     } else {
    847       if (grpc_lb_glb_trace.enabled()) {
    848         gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
    849                 grpclb_policy);
    850       }
    851       grpc_grpclb_destroy_serverlist(serverlist);
    852     }
    853   } else {
    854     // No valid initial response or serverlist found.
    855     gpr_log(GPR_ERROR,
    856             "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
    857             grpclb_policy,
    858             grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
    859   }
    860   grpc_slice_unref_internal(response_slice);
    861   if (!grpclb_policy->shutting_down_) {
    862     // Keep listening for serverlist updates.
    863     grpc_op op;
    864     memset(&op, 0, sizeof(op));
    865     op.op = GRPC_OP_RECV_MESSAGE;
    866     op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
    867     op.flags = 0;
    868     op.reserved = nullptr;
    869     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
    870     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
    871         lb_calld->lb_call_, &op, 1,
    872         &lb_calld->lb_on_balancer_message_received_);
    873     GPR_ASSERT(GRPC_CALL_OK == call_error);
    874   } else {
    875     lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
    876   }
    877 }
    878 
    879 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
    880     void* arg, grpc_error* error) {
    881   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
    882   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
    883   GPR_ASSERT(lb_calld->lb_call_ != nullptr);
    884   if (grpc_lb_glb_trace.enabled()) {
    885     char* status_details =
    886         grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
    887     gpr_log(GPR_INFO,
    888             "[grpclb %p] Status from LB server received. Status = %d, details "
    889             "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
    890             grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
    891             lb_calld->lb_call_, grpc_error_string(error));
    892     gpr_free(status_details);
    893   }
    894   grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
    895   // If this lb_calld is still in use, this call ended because of a failure so
    896   // we want to retry connecting. Otherwise, we have deliberately ended this
    897   // call and no further action is required.
    898   if (lb_calld == grpclb_policy->lb_calld_.get()) {
    899     grpclb_policy->lb_calld_.reset();
    900     GPR_ASSERT(!grpclb_policy->shutting_down_);
    901     if (lb_calld->seen_initial_response_) {
    902       // If we lose connection to the LB server, reset the backoff and restart
    903       // the LB call immediately.
    904       grpclb_policy->lb_call_backoff_.Reset();
    905       grpclb_policy->StartBalancerCallLocked();
    906     } else {
    907       // If this LB call fails establishing any connection to the LB server,
    908       // retry later.
    909       grpclb_policy->StartBalancerCallRetryTimerLocked();
    910     }
    911   }
    912   lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
    913 }
    914 
    915 //
    916 // helper code for creating balancer channel
    917 //
    918 
    919 grpc_lb_addresses* ExtractBalancerAddresses(
    920     const grpc_lb_addresses* addresses) {
    921   size_t num_grpclb_addrs = 0;
    922   for (size_t i = 0; i < addresses->num_addresses; ++i) {
    923     if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
    924   }
    925   // There must be at least one balancer address, or else the
    926   // client_channel would not have chosen this LB policy.
    927   GPR_ASSERT(num_grpclb_addrs > 0);
    928   grpc_lb_addresses* lb_addresses =
    929       grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
    930   size_t lb_addresses_idx = 0;
    931   for (size_t i = 0; i < addresses->num_addresses; ++i) {
    932     if (!addresses->addresses[i].is_balancer) continue;
    933     if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
    934       gpr_log(GPR_ERROR,
    935               "This LB policy doesn't support user data. It will be ignored");
    936     }
    937     grpc_lb_addresses_set_address(
    938         lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
    939         addresses->addresses[i].address.len, false /* is balancer */,
    940         addresses->addresses[i].balancer_name, nullptr /* user data */);
    941   }
    942   GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
    943   return lb_addresses;
    944 }
    945 
    946 /* Returns the channel args for the LB channel, used to create a bidirectional
    947  * stream for the reception of load balancing updates.
    948  *
    949  * Inputs:
    950  *   - \a addresses: corresponding to the balancers.
    951  *   - \a response_generator: in order to propagate updates from the resolver
    952  *   above the grpclb policy.
    953  *   - \a args: other args inherited from the grpclb policy. */
    954 grpc_channel_args* BuildBalancerChannelArgs(
    955     const grpc_lb_addresses* addresses,
    956     FakeResolverResponseGenerator* response_generator,
    957     const grpc_channel_args* args) {
    958   grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
    959   // Channel args to remove.
    960   static const char* args_to_remove[] = {
    961       // LB policy name, since we want to use the default (pick_first) in
    962       // the LB channel.
    963       GRPC_ARG_LB_POLICY_NAME,
    964       // The channel arg for the server URI, since that will be different for
    965       // the LB channel than for the parent channel.  The client channel
    966       // factory will re-add this arg with the right value.
    967       GRPC_ARG_SERVER_URI,
    968       // The resolved addresses, which will be generated by the name resolver
    969       // used in the LB channel.  Note that the LB channel will use the fake
    970       // resolver, so this won't actually generate a query to DNS (or some
    971       // other name service).  However, the addresses returned by the fake
    972       // resolver will have is_balancer=false, whereas our own addresses have
    973       // is_balancer=true.  We need the LB channel to return addresses with
    974       // is_balancer=false so that it does not wind up recursively using the
    975       // grpclb LB policy, as per the special case logic in client_channel.c.
    976       GRPC_ARG_LB_ADDRESSES,
    977       // The fake resolver response generator, because we are replacing it
    978       // with the one from the grpclb policy, used to propagate updates to
    979       // the LB channel.
    980       GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
    981       // The LB channel should use the authority indicated by the target
    982       // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
    983       // as opposed to the authority from the parent channel.
    984       GRPC_ARG_DEFAULT_AUTHORITY,
    985       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
    986       // treated as a stand-alone channel and not inherit this argument from the
    987       // args of the parent channel.
    988       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
    989   };
    990   // Channel args to add.
    991   const grpc_arg args_to_add[] = {
    992       // New LB addresses.
    993       // Note that we pass these in both when creating the LB channel
    994       // and via the fake resolver.  The latter is what actually gets used.
    995       grpc_lb_addresses_create_channel_arg(lb_addresses),
    996       // The fake resolver response generator, which we use to inject
    997       // address updates into the LB channel.
    998       grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
    999           response_generator),
   1000       // A channel arg indicating the target is a grpclb load balancer.
   1001       grpc_channel_arg_integer_create(
   1002           const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
   1003       // A channel arg indicating this is an internal channels, aka it is
   1004       // owned by components in Core, not by the user application.
   1005       grpc_channel_arg_integer_create(
   1006           const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
   1007   };
   1008   // Construct channel args.
   1009   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
   1010       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
   1011       GPR_ARRAY_SIZE(args_to_add));
   1012   // Make any necessary modifications for security.
   1013   new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
   1014   // Clean up.
   1015   grpc_lb_addresses_destroy(lb_addresses);
   1016   return new_args;
   1017 }
   1018 
   1019 //
   1020 // ctor and dtor
   1021 //
   1022 
   1023 GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
   1024                const LoadBalancingPolicy::Args& args)
   1025     : LoadBalancingPolicy(args),
   1026       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
   1027       lb_call_backoff_(
   1028           BackOff::Options()
   1029               .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
   1030                                    1000)
   1031               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
   1032               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
   1033               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
   1034                                1000)) {
   1035   // Initialization.
   1036   gpr_mu_init(&lb_channel_mu_);
   1037   grpc_subchannel_index_ref();
   1038   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
   1039                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
   1040                     grpc_combiner_scheduler(args.combiner));
   1041   GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
   1042                     &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
   1043                     grpc_combiner_scheduler(args.combiner));
   1044   GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
   1045                     &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
   1046                     grpc_combiner_scheduler(args.combiner));
   1047   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
   1048   // Record server name.
   1049   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
   1050   const char* server_uri = grpc_channel_arg_get_string(arg);
   1051   GPR_ASSERT(server_uri != nullptr);
   1052   grpc_uri* uri = grpc_uri_parse(server_uri, true);
   1053   GPR_ASSERT(uri->path[0] != '\0');
   1054   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
   1055   if (grpc_lb_glb_trace.enabled()) {
   1056     gpr_log(GPR_INFO,
   1057             "[grpclb %p] Will use '%s' as the server name for LB request.",
   1058             this, server_name_);
   1059   }
   1060   grpc_uri_destroy(uri);
   1061   // Record LB call timeout.
   1062   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
   1063   lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
   1064   // Record fallback timeout.
   1065   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
   1066   lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
   1067       arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
   1068   // Process channel args.
   1069   ProcessChannelArgsLocked(*args.args);
   1070 }
   1071 
   1072 GrpcLb::~GrpcLb() {
   1073   GPR_ASSERT(pending_picks_ == nullptr);
   1074   gpr_mu_destroy(&lb_channel_mu_);
   1075   gpr_free((void*)server_name_);
   1076   grpc_channel_args_destroy(args_);
   1077   grpc_connectivity_state_destroy(&state_tracker_);
   1078   if (serverlist_ != nullptr) {
   1079     grpc_grpclb_destroy_serverlist(serverlist_);
   1080   }
   1081   if (fallback_backend_addresses_ != nullptr) {
   1082     grpc_lb_addresses_destroy(fallback_backend_addresses_);
   1083   }
   1084   grpc_subchannel_index_unref();
   1085 }
   1086 
   1087 void GrpcLb::ShutdownLocked() {
   1088   grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   1089   shutting_down_ = true;
   1090   lb_calld_.reset();
   1091   if (retry_timer_callback_pending_) {
   1092     grpc_timer_cancel(&lb_call_retry_timer_);
   1093   }
   1094   if (fallback_timer_callback_pending_) {
   1095     grpc_timer_cancel(&lb_fallback_timer_);
   1096   }
   1097   rr_policy_.reset();
   1098   TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
   1099   // We destroy the LB channel here instead of in our destructor because
   1100   // destroying the channel triggers a last callback to
   1101   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
   1102   // alive when that callback is invoked.
   1103   if (lb_channel_ != nullptr) {
   1104     gpr_mu_lock(&lb_channel_mu_);
   1105     grpc_channel_destroy(lb_channel_);
   1106     lb_channel_ = nullptr;
   1107     gpr_mu_unlock(&lb_channel_mu_);
   1108   }
   1109   grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
   1110                               GRPC_ERROR_REF(error), "grpclb_shutdown");
   1111   // Clear pending picks.
   1112   PendingPick* pp;
   1113   while ((pp = pending_picks_) != nullptr) {
   1114     pending_picks_ = pp->next;
   1115     pp->pick->connected_subchannel.reset();
   1116     // Note: pp is deleted in this callback.
   1117     GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
   1118   }
   1119   GRPC_ERROR_UNREF(error);
   1120 }
   1121 
   1122 //
   1123 // public methods
   1124 //
   1125 
   1126 void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
   1127   PendingPick* pp;
   1128   while ((pp = pending_picks_) != nullptr) {
   1129     pending_picks_ = pp->next;
   1130     pp->pick->on_complete = pp->original_on_complete;
   1131     pp->pick->user_data = nullptr;
   1132     grpc_error* error = GRPC_ERROR_NONE;
   1133     if (new_policy->PickLocked(pp->pick, &error)) {
   1134       // Synchronous return; schedule closure.
   1135       GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
   1136     }
   1137     Delete(pp);
   1138   }
   1139 }
   1140 
   1141 // Cancel a specific pending pick.
   1142 //
   1143 // A grpclb pick progresses as follows:
   1144 // - If there's a Round Robin policy (rr_policy_) available, it'll be
   1145 //   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
   1146 //   that point onwards, it'll be RR's responsibility. For cancellations, that
   1147 //   implies the pick needs also be cancelled by the RR instance.
   1148 // - Otherwise, without an RR instance, picks stay pending at this policy's
   1149 //   level (grpclb), inside the pending_picks_ list. To cancel these,
   1150 //   we invoke the completion closure and set the pick's connected
   1151 //   subchannel to nullptr right here.
   1152 void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
   1153   PendingPick* pp = pending_picks_;
   1154   pending_picks_ = nullptr;
   1155   while (pp != nullptr) {
   1156     PendingPick* next = pp->next;
   1157     if (pp->pick == pick) {
   1158       pick->connected_subchannel.reset();
   1159       // Note: pp is deleted in this callback.
   1160       GRPC_CLOSURE_SCHED(&pp->on_complete,
   1161                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
   1162                              "Pick Cancelled", &error, 1));
   1163     } else {
   1164       pp->next = pending_picks_;
   1165       pending_picks_ = pp;
   1166     }
   1167     pp = next;
   1168   }
   1169   if (rr_policy_ != nullptr) {
   1170     rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
   1171   }
   1172   GRPC_ERROR_UNREF(error);
   1173 }
   1174 
   1175 // Cancel all pending picks.
   1176 //
   1177 // A grpclb pick progresses as follows:
   1178 // - If there's a Round Robin policy (rr_policy_) available, it'll be
   1179 //   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
   1180 //   that point onwards, it'll be RR's responsibility. For cancellations, that
   1181 //   implies the pick needs also be cancelled by the RR instance.
   1182 // - Otherwise, without an RR instance, picks stay pending at this policy's
   1183 //   level (grpclb), inside the pending_picks_ list. To cancel these,
   1184 //   we invoke the completion closure and set the pick's connected
   1185 //   subchannel to nullptr right here.
   1186 void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
   1187                                        uint32_t initial_metadata_flags_eq,
   1188                                        grpc_error* error) {
   1189   PendingPick* pp = pending_picks_;
   1190   pending_picks_ = nullptr;
   1191   while (pp != nullptr) {
   1192     PendingPick* next = pp->next;
   1193     if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
   1194         initial_metadata_flags_eq) {
   1195       // Note: pp is deleted in this callback.
   1196       GRPC_CLOSURE_SCHED(&pp->on_complete,
   1197                          GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
   1198                              "Pick Cancelled", &error, 1));
   1199     } else {
   1200       pp->next = pending_picks_;
   1201       pending_picks_ = pp;
   1202     }
   1203     pp = next;
   1204   }
   1205   if (rr_policy_ != nullptr) {
   1206     rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
   1207                                           initial_metadata_flags_eq,
   1208                                           GRPC_ERROR_REF(error));
   1209   }
   1210   GRPC_ERROR_UNREF(error);
   1211 }
   1212 
   1213 void GrpcLb::ExitIdleLocked() {
   1214   if (!started_picking_) {
   1215     StartPickingLocked();
   1216   }
   1217 }
   1218 
   1219 void GrpcLb::ResetBackoffLocked() {
   1220   if (lb_channel_ != nullptr) {
   1221     grpc_channel_reset_connect_backoff(lb_channel_);
   1222   }
   1223   if (rr_policy_ != nullptr) {
   1224     rr_policy_->ResetBackoffLocked();
   1225   }
   1226 }
   1227 
   1228 bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
   1229   PendingPick* pp = PendingPickCreate(pick);
   1230   bool pick_done = false;
   1231   if (rr_policy_ != nullptr) {
   1232     if (grpc_lb_glb_trace.enabled()) {
   1233       gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
   1234               rr_policy_.get());
   1235     }
   1236     pick_done =
   1237         PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
   1238   } else {  // rr_policy_ == NULL
   1239     if (pick->on_complete == nullptr) {
   1240       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
   1241           "No pick result available but synchronous result required.");
   1242       pick_done = true;
   1243     } else {
   1244       if (grpc_lb_glb_trace.enabled()) {
   1245         gpr_log(GPR_INFO,
   1246                 "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
   1247                 this);
   1248       }
   1249       AddPendingPick(pp);
   1250       if (!started_picking_) {
   1251         StartPickingLocked();
   1252       }
   1253       pick_done = false;
   1254     }
   1255   }
   1256   return pick_done;
   1257 }
   1258 
   1259 void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
   1260                                       ChildRefsList* child_channels) {
   1261   // delegate to the RoundRobin to fill the children subchannels.
   1262   rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
   1263   MutexLock lock(&lb_channel_mu_);
   1264   if (lb_channel_ != nullptr) {
   1265     grpc_core::channelz::ChannelNode* channel_node =
   1266         grpc_channel_get_channelz_node(lb_channel_);
   1267     if (channel_node != nullptr) {
   1268       child_channels->push_back(channel_node->uuid());
   1269     }
   1270   }
   1271 }
   1272 
   1273 grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
   1274     grpc_error** connectivity_error) {
   1275   return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
   1276 }
   1277 
   1278 void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
   1279                                        grpc_closure* notify) {
   1280   grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
   1281                                                  notify);
   1282 }
   1283 
   1284 void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
   1285   const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
   1286   if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
   1287     // Ignore this update.
   1288     gpr_log(
   1289         GPR_ERROR,
   1290         "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
   1291         this);
   1292     return;
   1293   }
   1294   const grpc_lb_addresses* addresses =
   1295       static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
   1296   // Update fallback address list.
   1297   if (fallback_backend_addresses_ != nullptr) {
   1298     grpc_lb_addresses_destroy(fallback_backend_addresses_);
   1299   }
   1300   fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
   1301   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
   1302   // since we use this to trigger the client_load_reporting filter.
   1303   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
   1304   grpc_arg new_arg = grpc_channel_arg_string_create(
   1305       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
   1306   grpc_channel_args_destroy(args_);
   1307   args_ = grpc_channel_args_copy_and_add_and_remove(
   1308       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
   1309   // Construct args for balancer channel.
   1310   grpc_channel_args* lb_channel_args =
   1311       BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
   1312   // Create balancer channel if needed.
   1313   if (lb_channel_ == nullptr) {
   1314     char* uri_str;
   1315     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
   1316     gpr_mu_lock(&lb_channel_mu_);
   1317     lb_channel_ = grpc_client_channel_factory_create_channel(
   1318         client_channel_factory(), uri_str,
   1319         GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
   1320     gpr_mu_unlock(&lb_channel_mu_);
   1321     GPR_ASSERT(lb_channel_ != nullptr);
   1322     gpr_free(uri_str);
   1323   }
   1324   // Propagate updates to the LB channel (pick_first) through the fake
   1325   // resolver.
   1326   response_generator_->SetResponse(lb_channel_args);
   1327   grpc_channel_args_destroy(lb_channel_args);
   1328 }
   1329 
   1330 void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
   1331   ProcessChannelArgsLocked(args);
   1332   // If fallback is configured and the RR policy already exists, update
   1333   // it with the new fallback addresses.
   1334   if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
   1335     CreateOrUpdateRoundRobinPolicyLocked();
   1336   }
   1337   // Start watching the LB channel connectivity for connection, if not
   1338   // already doing so.
   1339   if (!watching_lb_channel_) {
   1340     lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
   1341         lb_channel_, true /* try to connect */);
   1342     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
   1343         grpc_channel_get_channel_stack(lb_channel_));
   1344     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
   1345     watching_lb_channel_ = true;
   1346     // TODO(roth): We currently track this ref manually.  Once the
   1347     // ClosureRef API is ready, we should pass the RefCountedPtr<> along
   1348     // with the callback.
   1349     auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
   1350     self.release();
   1351     grpc_client_channel_watch_connectivity_state(
   1352         client_channel_elem,
   1353         grpc_polling_entity_create_from_pollset_set(interested_parties()),
   1354         &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
   1355         nullptr);
   1356   }
   1357 }
   1358 
   1359 //
   1360 // code for balancer channel and call
   1361 //
   1362 
   1363 void GrpcLb::StartPickingLocked() {
   1364   // Start a timer to fall back.
   1365   if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
   1366       !fallback_timer_callback_pending_) {
   1367     grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
   1368     // TODO(roth): We currently track this ref manually.  Once the
   1369     // ClosureRef API is ready, we should pass the RefCountedPtr<> along
   1370     // with the callback.
   1371     auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
   1372     self.release();
   1373     GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
   1374                       grpc_combiner_scheduler(combiner()));
   1375     fallback_timer_callback_pending_ = true;
   1376     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
   1377   }
   1378   started_picking_ = true;
   1379   StartBalancerCallLocked();
   1380 }
   1381 
   1382 void GrpcLb::StartBalancerCallLocked() {
   1383   GPR_ASSERT(lb_channel_ != nullptr);
   1384   if (shutting_down_) return;
   1385   // Init the LB call data.
   1386   GPR_ASSERT(lb_calld_ == nullptr);
   1387   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
   1388   if (grpc_lb_glb_trace.enabled()) {
   1389     gpr_log(GPR_INFO,
   1390             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
   1391             this, lb_channel_, lb_calld_.get());
   1392   }
   1393   lb_calld_->StartQuery();
   1394 }
   1395 
   1396 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
   1397   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
   1398   grpclb_policy->fallback_timer_callback_pending_ = false;
   1399   // If we receive a serverlist after the timer fires but before this callback
   1400   // actually runs, don't fall back.
   1401   if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
   1402       error == GRPC_ERROR_NONE) {
   1403     if (grpc_lb_glb_trace.enabled()) {
   1404       gpr_log(GPR_INFO,
   1405               "[grpclb %p] Falling back to use backends from resolver",
   1406               grpclb_policy);
   1407     }
   1408     GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
   1409     grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
   1410   }
   1411   grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
   1412 }
   1413 
   1414 void GrpcLb::StartBalancerCallRetryTimerLocked() {
   1415   grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
   1416   if (grpc_lb_glb_trace.enabled()) {
   1417     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
   1418     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
   1419     if (timeout > 0) {
   1420       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
   1421               this, timeout);
   1422     } else {
   1423       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
   1424               this);
   1425     }
   1426   }
   1427   // TODO(roth): We currently track this ref manually.  Once the
   1428   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
   1429   // with the callback.
   1430   auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
   1431   self.release();
   1432   GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
   1433                     this, grpc_combiner_scheduler(combiner()));
   1434   retry_timer_callback_pending_ = true;
   1435   grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
   1436 }
   1437 
   1438 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
   1439   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
   1440   grpclb_policy->retry_timer_callback_pending_ = false;
   1441   if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
   1442       grpclb_policy->lb_calld_ == nullptr) {
   1443     if (grpc_lb_glb_trace.enabled()) {
   1444       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
   1445               grpclb_policy);
   1446     }
   1447     grpclb_policy->StartBalancerCallLocked();
   1448   }
   1449   grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
   1450 }
   1451 
   1452 // Invoked as part of the update process. It continues watching the LB channel
   1453 // until it shuts down or becomes READY. It's invoked even if the LB channel
   1454 // stayed READY throughout the update (for example if the update is identical).
   1455 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
   1456                                                         grpc_error* error) {
   1457   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
   1458   if (grpclb_policy->shutting_down_) goto done;
   1459   // Re-initialize the lb_call. This should also take care of updating the
   1460   // embedded RR policy. Note that the current RR policy, if any, will stay in
   1461   // effect until an update from the new lb_call is received.
   1462   switch (grpclb_policy->lb_channel_connectivity_) {
   1463     case GRPC_CHANNEL_CONNECTING:
   1464     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
   1465       // Keep watching the LB channel.
   1466       grpc_channel_element* client_channel_elem =
   1467           grpc_channel_stack_last_element(
   1468               grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
   1469       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
   1470       grpc_client_channel_watch_connectivity_state(
   1471           client_channel_elem,
   1472           grpc_polling_entity_create_from_pollset_set(
   1473               grpclb_policy->interested_parties()),
   1474           &grpclb_policy->lb_channel_connectivity_,
   1475           &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
   1476       break;
   1477     }
   1478       // The LB channel may be IDLE because it's shut down before the update.
   1479       // Restart the LB call to kick the LB channel into gear.
   1480     case GRPC_CHANNEL_IDLE:
   1481     case GRPC_CHANNEL_READY:
   1482       grpclb_policy->lb_calld_.reset();
   1483       if (grpclb_policy->started_picking_) {
   1484         if (grpclb_policy->retry_timer_callback_pending_) {
   1485           grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
   1486         }
   1487         grpclb_policy->lb_call_backoff_.Reset();
   1488         grpclb_policy->StartBalancerCallLocked();
   1489       }
   1490       // Fall through.
   1491     case GRPC_CHANNEL_SHUTDOWN:
   1492     done:
   1493       grpclb_policy->watching_lb_channel_ = false;
   1494       grpclb_policy->Unref(DEBUG_LOCATION,
   1495                            "watch_lb_channel_connectivity_cb_shutdown");
   1496   }
   1497 }
   1498 
   1499 //
   1500 // PendingPick
   1501 //
   1502 
   1503 // Adds lb_token of selected subchannel (address) to the call's initial
   1504 // metadata.
   1505 grpc_error* AddLbTokenToInitialMetadata(
   1506     grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
   1507     grpc_metadata_batch* initial_metadata) {
   1508   GPR_ASSERT(lb_token_mdelem_storage != nullptr);
   1509   GPR_ASSERT(!GRPC_MDISNULL(lb_token));
   1510   return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
   1511                                       lb_token);
   1512 }
   1513 
   1514 // Destroy function used when embedding client stats in call context.
   1515 void DestroyClientStats(void* arg) {
   1516   static_cast<GrpcLbClientStats*>(arg)->Unref();
   1517 }
   1518 
   1519 void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
   1520   /* if connected_subchannel is nullptr, no pick has been made by the RR
   1521    * policy (e.g., all addresses failed to connect). There won't be any
   1522    * user_data/token available */
   1523   if (pp->pick->connected_subchannel != nullptr) {
   1524     if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
   1525       AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
   1526                                   &pp->pick->lb_token_mdelem_storage,
   1527                                   pp->pick->initial_metadata);
   1528     } else {
   1529       gpr_log(GPR_ERROR,
   1530               "[grpclb %p] No LB token for connected subchannel pick %p",
   1531               pp->grpclb_policy, pp->pick);
   1532       abort();
   1533     }
   1534     // Pass on client stats via context. Passes ownership of the reference.
   1535     if (pp->client_stats != nullptr) {
   1536       pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
   1537           pp->client_stats.release();
   1538       pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
   1539           DestroyClientStats;
   1540     }
   1541   } else {
   1542     pp->client_stats.reset();
   1543   }
   1544 }
   1545 
   1546 /* The \a on_complete closure passed as part of the pick requires keeping a
   1547  * reference to its associated round robin instance. We wrap this closure in
   1548  * order to unref the round robin instance upon its invocation */
   1549 void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
   1550   PendingPick* pp = static_cast<PendingPick*>(arg);
   1551   PendingPickSetMetadataAndContext(pp);
   1552   GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
   1553   Delete(pp);
   1554 }
   1555 
   1556 GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
   1557   PendingPick* pp = New<PendingPick>();
   1558   pp->grpclb_policy = this;
   1559   pp->pick = pick;
   1560   GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
   1561                     grpc_schedule_on_exec_ctx);
   1562   pp->original_on_complete = pick->on_complete;
   1563   pick->on_complete = &pp->on_complete;
   1564   return pp;
   1565 }
   1566 
   1567 void GrpcLb::AddPendingPick(PendingPick* pp) {
   1568   pp->next = pending_picks_;
   1569   pending_picks_ = pp;
   1570 }
   1571 
   1572 //
   1573 // code for interacting with the RR policy
   1574 //
   1575 
   1576 // Performs a pick over \a rr_policy_. Given that a pick can return
   1577 // immediately (ignoring its completion callback), we need to perform the
   1578 // cleanups this callback would otherwise be responsible for.
   1579 // If \a force_async is true, then we will manually schedule the
   1580 // completion callback even if the pick is available immediately.
   1581 bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
   1582                                             grpc_error** error) {
   1583   // Check for drops if we are not using fallback backend addresses.
   1584   if (serverlist_ != nullptr) {
   1585     // Look at the index into the serverlist to see if we should drop this call.
   1586     grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
   1587     if (serverlist_index_ == serverlist_->num_servers) {
   1588       serverlist_index_ = 0;  // Wrap-around.
   1589     }
   1590     if (server->drop) {
   1591       // Update client load reporting stats to indicate the number of
   1592       // dropped calls.  Note that we have to do this here instead of in
   1593       // the client_load_reporting filter, because we do not create a
   1594       // subchannel call (and therefore no client_load_reporting filter)
   1595       // for dropped calls.
   1596       if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
   1597         lb_calld_->client_stats()->AddCallDroppedLocked(
   1598             server->load_balance_token);
   1599       }
   1600       if (force_async) {
   1601         GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
   1602         Delete(pp);
   1603         return false;
   1604       }
   1605       Delete(pp);
   1606       return true;
   1607     }
   1608   }
   1609   // Set client_stats and user_data.
   1610   if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
   1611     pp->client_stats = lb_calld_->client_stats()->Ref();
   1612   }
   1613   GPR_ASSERT(pp->pick->user_data == nullptr);
   1614   pp->pick->user_data = (void**)&pp->lb_token;
   1615   // Pick via the RR policy.
   1616   bool pick_done = rr_policy_->PickLocked(pp->pick, error);
   1617   if (pick_done) {
   1618     PendingPickSetMetadataAndContext(pp);
   1619     if (force_async) {
   1620       GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
   1621       *error = GRPC_ERROR_NONE;
   1622       pick_done = false;
   1623     }
   1624     Delete(pp);
   1625   }
   1626   // else, the pending pick will be registered and taken care of by the
   1627   // pending pick list inside the RR policy.  Eventually,
   1628   // OnPendingPickComplete() will be called, which will (among other
   1629   // things) add the LB token to the call's initial metadata.
   1630   return pick_done;
   1631 }
   1632 
   1633 void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
   1634   GPR_ASSERT(rr_policy_ == nullptr);
   1635   rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
   1636       "round_robin", args);
   1637   if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
   1638     gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
   1639             this);
   1640     return;
   1641   }
   1642   // TODO(roth): We currently track this ref manually.  Once the new
   1643   // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
   1644   auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
   1645   self.release();
   1646   rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
   1647   grpc_error* rr_state_error = nullptr;
   1648   rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
   1649   // Connectivity state is a function of the RR policy updated/created.
   1650   UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
   1651   // Add the gRPC LB's interested_parties pollset_set to that of the newly
   1652   // created RR policy. This will make the RR policy progress upon activity on
   1653   // gRPC LB, which in turn is tied to the application's call.
   1654   grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
   1655                                    interested_parties());
   1656   // Subscribe to changes to the connectivity of the new RR.
   1657   // TODO(roth): We currently track this ref manually.  Once the new
   1658   // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
   1659   self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
   1660   self.release();
   1661   rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
   1662                                         &on_rr_connectivity_changed_);
   1663   rr_policy_->ExitIdleLocked();
   1664   // Send pending picks to RR policy.
   1665   PendingPick* pp;
   1666   while ((pp = pending_picks_)) {
   1667     pending_picks_ = pp->next;
   1668     if (grpc_lb_glb_trace.enabled()) {
   1669       gpr_log(GPR_INFO,
   1670               "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
   1671               rr_policy_.get());
   1672     }
   1673     grpc_error* error = GRPC_ERROR_NONE;
   1674     PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
   1675   }
   1676 }
   1677 
   1678 grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
   1679   grpc_lb_addresses* addresses;
   1680   bool is_backend_from_grpclb_load_balancer = false;
   1681   if (serverlist_ != nullptr) {
   1682     GPR_ASSERT(serverlist_->num_servers > 0);
   1683     addresses = ProcessServerlist(serverlist_);
   1684     is_backend_from_grpclb_load_balancer = true;
   1685   } else {
   1686     // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
   1687     // received any serverlist from the balancer, we use the fallback backends
   1688     // returned by the resolver. Note that the fallback backend list may be
   1689     // empty, in which case the new round_robin policy will keep the requested
   1690     // picks pending.
   1691     GPR_ASSERT(fallback_backend_addresses_ != nullptr);
   1692     addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
   1693   }
   1694   GPR_ASSERT(addresses != nullptr);
   1695   // Replace the LB addresses in the channel args that we pass down to
   1696   // the subchannel.
   1697   static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
   1698   const grpc_arg args_to_add[] = {
   1699       grpc_lb_addresses_create_channel_arg(addresses),
   1700       // A channel arg indicating if the target is a backend inferred from a
   1701       // grpclb load balancer.
   1702       grpc_channel_arg_integer_create(
   1703           const_cast<char*>(
   1704               GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
   1705           is_backend_from_grpclb_load_balancer),
   1706   };
   1707   grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
   1708       args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
   1709       GPR_ARRAY_SIZE(args_to_add));
   1710   grpc_lb_addresses_destroy(addresses);
   1711   return args;
   1712 }
   1713 
   1714 void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
   1715   if (shutting_down_) return;
   1716   grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
   1717   GPR_ASSERT(args != nullptr);
   1718   if (rr_policy_ != nullptr) {
   1719     if (grpc_lb_glb_trace.enabled()) {
   1720       gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
   1721               rr_policy_.get());
   1722     }
   1723     rr_policy_->UpdateLocked(*args);
   1724   } else {
   1725     LoadBalancingPolicy::Args lb_policy_args;
   1726     lb_policy_args.combiner = combiner();
   1727     lb_policy_args.client_channel_factory = client_channel_factory();
   1728     lb_policy_args.args = args;
   1729     CreateRoundRobinPolicyLocked(lb_policy_args);
   1730     if (grpc_lb_glb_trace.enabled()) {
   1731       gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
   1732               rr_policy_.get());
   1733     }
   1734   }
   1735   grpc_channel_args_destroy(args);
   1736 }
   1737 
   1738 void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
   1739                                                    grpc_error* error) {
   1740   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
   1741   if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
   1742     grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
   1743     return;
   1744   }
   1745   if (grpc_lb_glb_trace.enabled()) {
   1746     gpr_log(
   1747         GPR_INFO,
   1748         "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
   1749         grpclb_policy, grpclb_policy->rr_policy_.get());
   1750   }
   1751   // If we are talking to a balancer, we expect to get updated addresses form
   1752   // the balancer, so we can ignore the re-resolution request from the RR
   1753   // policy. Otherwise, handle the re-resolution request using the
   1754   // grpclb policy's original re-resolution closure.
   1755   if (grpclb_policy->lb_calld_ == nullptr ||
   1756       !grpclb_policy->lb_calld_->seen_initial_response()) {
   1757     grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
   1758   }
   1759   // Give back the wrapper closure to the RR policy.
   1760   grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
   1761       &grpclb_policy->on_rr_request_reresolution_);
   1762 }
   1763 
   1764 void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
   1765     grpc_error* rr_state_error) {
   1766   const grpc_connectivity_state curr_glb_state =
   1767       grpc_connectivity_state_check(&state_tracker_);
   1768   /* The new connectivity status is a function of the previous one and the new
   1769    * input coming from the status of the RR policy.
   1770    *
   1771    *  current state (grpclb's)
   1772    *  |
   1773    *  v  || I  |  C  |  R  |  TF  |  SD  |  <- new state (RR's)
   1774    *  ===++====+=====+=====+======+======+
   1775    *   I || I  |  C  |  R  | [I]  | [I]  |
   1776    *  ---++----+-----+-----+------+------+
   1777    *   C || I  |  C  |  R  | [C]  | [C]  |
   1778    *  ---++----+-----+-----+------+------+
   1779    *   R || I  |  C  |  R  | [R]  | [R]  |
   1780    *  ---++----+-----+-----+------+------+
   1781    *  TF || I  |  C  |  R  | [TF] | [TF] |
   1782    *  ---++----+-----+-----+------+------+
   1783    *  SD || NA |  NA |  NA |  NA  |  NA  | (*)
   1784    *  ---++----+-----+-----+------+------+
   1785    *
   1786    * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
   1787    * is the current state of grpclb, which is left untouched.
   1788    *
   1789    *  In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
   1790    *  the previous RR instance.
   1791    *
   1792    *  Note that the status is never updated to SHUTDOWN as a result of calling
   1793    *  this function. Only glb_shutdown() has the power to set that state.
   1794    *
   1795    *  (*) This function mustn't be called during shutting down. */
   1796   GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
   1797   switch (rr_connectivity_state_) {
   1798     case GRPC_CHANNEL_TRANSIENT_FAILURE:
   1799     case GRPC_CHANNEL_SHUTDOWN:
   1800       GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
   1801       break;
   1802     case GRPC_CHANNEL_IDLE:
   1803     case GRPC_CHANNEL_CONNECTING:
   1804     case GRPC_CHANNEL_READY:
   1805       GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
   1806   }
   1807   if (grpc_lb_glb_trace.enabled()) {
   1808     gpr_log(
   1809         GPR_INFO,
   1810         "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
   1811         this, grpc_connectivity_state_name(rr_connectivity_state_),
   1812         rr_policy_.get());
   1813   }
   1814   grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
   1815                               rr_state_error,
   1816                               "update_lb_connectivity_status_locked");
   1817 }
   1818 
   1819 void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
   1820                                                    grpc_error* error) {
   1821   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
   1822   if (grpclb_policy->shutting_down_) {
   1823     grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
   1824     return;
   1825   }
   1826   grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
   1827       GRPC_ERROR_REF(error));
   1828   // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
   1829   grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
   1830       &grpclb_policy->rr_connectivity_state_,
   1831       &grpclb_policy->on_rr_connectivity_changed_);
   1832 }
   1833 
   1834 //
   1835 // factory
   1836 //
   1837 
   1838 class GrpcLbFactory : public LoadBalancingPolicyFactory {
   1839  public:
   1840   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
   1841       const LoadBalancingPolicy::Args& args) const override {
   1842     /* Count the number of gRPC-LB addresses. There must be at least one. */
   1843     const grpc_arg* arg =
   1844         grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
   1845     if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
   1846       return nullptr;
   1847     }
   1848     grpc_lb_addresses* addresses =
   1849         static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
   1850     size_t num_grpclb_addrs = 0;
   1851     for (size_t i = 0; i < addresses->num_addresses; ++i) {
   1852       if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
   1853     }
   1854     if (num_grpclb_addrs == 0) return nullptr;
   1855     return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
   1856   }
   1857 
   1858   const char* name() const override { return "grpclb"; }
   1859 };
   1860 
   1861 }  // namespace
   1862 
   1863 }  // namespace grpc_core
   1864 
   1865 //
   1866 // Plugin registration
   1867 //
   1868 
   1869 namespace {
   1870 
   1871 // Only add client_load_reporting filter if the grpclb LB policy is used.
   1872 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
   1873                                             void* arg) {
   1874   const grpc_channel_args* args =
   1875       grpc_channel_stack_builder_get_channel_arguments(builder);
   1876   const grpc_arg* channel_arg =
   1877       grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
   1878   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
   1879       strcmp(channel_arg->value.string, "grpclb") == 0) {
   1880     return grpc_channel_stack_builder_append_filter(
   1881         builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
   1882   }
   1883   return true;
   1884 }
   1885 
   1886 }  // namespace
   1887 
   1888 void grpc_lb_policy_grpclb_init() {
   1889   grpc_core::LoadBalancingPolicyRegistry::Builder::
   1890       RegisterLoadBalancingPolicyFactory(
   1891           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
   1892               grpc_core::New<grpc_core::GrpcLbFactory>()));
   1893   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
   1894                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
   1895                                    maybe_add_client_load_reporting_filter,
   1896                                    (void*)&grpc_client_load_reporting_filter);
   1897 }
   1898 
   1899 void grpc_lb_policy_grpclb_shutdown() {}
   1900