Home | History | Annotate | Download | only in client_channel
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/ext/filters/client_channel/client_channel.h"
     22 
     23 #include <inttypes.h>
     24 #include <limits.h>
     25 #include <stdbool.h>
     26 #include <stdio.h>
     27 #include <string.h>
     28 
     29 #include <grpc/support/alloc.h>
     30 #include <grpc/support/log.h>
     31 #include <grpc/support/string_util.h>
     32 #include <grpc/support/sync.h>
     33 
     34 #include "src/core/ext/filters/client_channel/backup_poller.h"
     35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
     36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
     37 #include "src/core/ext/filters/client_channel/method_params.h"
     38 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
     39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
     40 #include "src/core/ext/filters/client_channel/retry_throttle.h"
     41 #include "src/core/ext/filters/client_channel/subchannel.h"
     42 #include "src/core/ext/filters/deadline/deadline_filter.h"
     43 #include "src/core/lib/backoff/backoff.h"
     44 #include "src/core/lib/channel/channel_args.h"
     45 #include "src/core/lib/channel/connected_channel.h"
     46 #include "src/core/lib/channel/status_util.h"
     47 #include "src/core/lib/gpr/string.h"
     48 #include "src/core/lib/gprpp/inlined_vector.h"
     49 #include "src/core/lib/gprpp/manual_constructor.h"
     50 #include "src/core/lib/iomgr/combiner.h"
     51 #include "src/core/lib/iomgr/iomgr.h"
     52 #include "src/core/lib/iomgr/polling_entity.h"
     53 #include "src/core/lib/profiling/timers.h"
     54 #include "src/core/lib/slice/slice_internal.h"
     55 #include "src/core/lib/slice/slice_string_helpers.h"
     56 #include "src/core/lib/surface/channel.h"
     57 #include "src/core/lib/transport/connectivity_state.h"
     58 #include "src/core/lib/transport/error_utils.h"
     59 #include "src/core/lib/transport/metadata.h"
     60 #include "src/core/lib/transport/metadata_batch.h"
     61 #include "src/core/lib/transport/service_config.h"
     62 #include "src/core/lib/transport/static_metadata.h"
     63 #include "src/core/lib/transport/status_metadata.h"
     64 
     65 using grpc_core::internal::ClientChannelMethodParams;
     66 using grpc_core::internal::ServerRetryThrottleData;
     67 
     68 /* Client channel implementation */
     69 
     70 // By default, we buffer 256 KiB per RPC for retries.
     71 // TODO(roth): Do we have any data to suggest a better value?
     72 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
     73 
     74 // This value was picked arbitrarily.  It can be changed if there is
     75 // any even moderately compelling reason to do so.
     76 #define RETRY_BACKOFF_JITTER 0.2
     77 
     78 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
     79 
     80 /*************************************************************************
     81  * CHANNEL-WIDE FUNCTIONS
     82  */
     83 
     84 struct external_connectivity_watcher;
     85 
     86 typedef grpc_core::SliceHashTable<
     87     grpc_core::RefCountedPtr<ClientChannelMethodParams>>
     88     MethodParamsTable;
     89 
     90 typedef struct client_channel_channel_data {
     91   grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
     92   bool started_resolving;
     93   bool deadline_checking_enabled;
     94   grpc_client_channel_factory* client_channel_factory;
     95   bool enable_retries;
     96   size_t per_rpc_retry_buffer_size;
     97 
     98   /** combiner protecting all variables below in this data structure */
     99   grpc_combiner* combiner;
    100   /** currently active load balancer */
    101   grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
    102   /** retry throttle data */
    103   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
    104   /** maps method names to method_parameters structs */
    105   grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
    106   /** incoming resolver result - set by resolver.next() */
    107   grpc_channel_args* resolver_result;
    108   /** a list of closures that are all waiting for resolver result to come in */
    109   grpc_closure_list waiting_for_resolver_result_closures;
    110   /** resolver callback */
    111   grpc_closure on_resolver_result_changed;
    112   /** connectivity state being tracked */
    113   grpc_connectivity_state_tracker state_tracker;
    114   /** when an lb_policy arrives, should we try to exit idle */
    115   bool exit_idle_when_lb_policy_arrives;
    116   /** owning stack */
    117   grpc_channel_stack* owning_stack;
    118   /** interested parties (owned) */
    119   grpc_pollset_set* interested_parties;
    120 
    121   /* external_connectivity_watcher_list head is guarded by its own mutex, since
    122    * counts need to be grabbed immediately without polling on a cq */
    123   gpr_mu external_connectivity_watcher_list_mu;
    124   struct external_connectivity_watcher* external_connectivity_watcher_list_head;
    125 
    126   /* the following properties are guarded by a mutex since APIs require them
    127      to be instantaneously available */
    128   gpr_mu info_mu;
    129   grpc_core::UniquePtr<char> info_lb_policy_name;
    130   /** service config in JSON form */
    131   grpc_core::UniquePtr<char> info_service_config_json;
    132 } channel_data;
    133 
    134 typedef struct {
    135   channel_data* chand;
    136   /** used as an identifier, don't dereference it because the LB policy may be
    137    * non-existing when the callback is run */
    138   grpc_core::LoadBalancingPolicy* lb_policy;
    139   grpc_closure closure;
    140 } reresolution_request_args;
    141 
    142 /** We create one watcher for each new lb_policy that is returned from a
    143     resolver, to watch for state changes from the lb_policy. When a state
    144     change is seen, we update the channel, and create a new watcher. */
    145 typedef struct {
    146   channel_data* chand;
    147   grpc_closure on_changed;
    148   grpc_connectivity_state state;
    149   grpc_core::LoadBalancingPolicy* lb_policy;
    150 } lb_policy_connectivity_watcher;
    151 
    152 static void watch_lb_policy_locked(channel_data* chand,
    153                                    grpc_core::LoadBalancingPolicy* lb_policy,
    154                                    grpc_connectivity_state current_state);
    155 
    156 static void set_channel_connectivity_state_locked(channel_data* chand,
    157                                                   grpc_connectivity_state state,
    158                                                   grpc_error* error,
    159                                                   const char* reason) {
    160   /* TODO: Improve failure handling:
    161    * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
    162    * - Hand over pending picks from old policies during the switch that happens
    163    *   when resolver provides an update. */
    164   if (chand->lb_policy != nullptr) {
    165     if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
    166       /* cancel picks with wait_for_ready=false */
    167       chand->lb_policy->CancelMatchingPicksLocked(
    168           /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
    169           /* check= */ 0, GRPC_ERROR_REF(error));
    170     } else if (state == GRPC_CHANNEL_SHUTDOWN) {
    171       /* cancel all picks */
    172       chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
    173                                                   GRPC_ERROR_REF(error));
    174     }
    175   }
    176   if (grpc_client_channel_trace.enabled()) {
    177     gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
    178             grpc_connectivity_state_name(state));
    179   }
    180   grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
    181 }
    182 
    183 static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
    184   lb_policy_connectivity_watcher* w =
    185       static_cast<lb_policy_connectivity_watcher*>(arg);
    186   /* check if the notification is for the latest policy */
    187   if (w->lb_policy == w->chand->lb_policy.get()) {
    188     if (grpc_client_channel_trace.enabled()) {
    189       gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
    190               w->lb_policy, grpc_connectivity_state_name(w->state));
    191     }
    192     set_channel_connectivity_state_locked(w->chand, w->state,
    193                                           GRPC_ERROR_REF(error), "lb_changed");
    194     if (w->state != GRPC_CHANNEL_SHUTDOWN) {
    195       watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
    196     }
    197   }
    198   GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
    199   gpr_free(w);
    200 }
    201 
    202 static void watch_lb_policy_locked(channel_data* chand,
    203                                    grpc_core::LoadBalancingPolicy* lb_policy,
    204                                    grpc_connectivity_state current_state) {
    205   lb_policy_connectivity_watcher* w =
    206       static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
    207   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
    208   w->chand = chand;
    209   GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
    210                     grpc_combiner_scheduler(chand->combiner));
    211   w->state = current_state;
    212   w->lb_policy = lb_policy;
    213   lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
    214 }
    215 
    216 static void start_resolving_locked(channel_data* chand) {
    217   if (grpc_client_channel_trace.enabled()) {
    218     gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
    219   }
    220   GPR_ASSERT(!chand->started_resolving);
    221   chand->started_resolving = true;
    222   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
    223   chand->resolver->NextLocked(&chand->resolver_result,
    224                               &chand->on_resolver_result_changed);
    225 }
    226 
    227 typedef struct {
    228   char* server_name;
    229   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
    230 } service_config_parsing_state;
    231 
    232 static void parse_retry_throttle_params(
    233     const grpc_json* field, service_config_parsing_state* parsing_state) {
    234   if (strcmp(field->key, "retryThrottling") == 0) {
    235     if (parsing_state->retry_throttle_data != nullptr) return;  // Duplicate.
    236     if (field->type != GRPC_JSON_OBJECT) return;
    237     int max_milli_tokens = 0;
    238     int milli_token_ratio = 0;
    239     for (grpc_json* sub_field = field->child; sub_field != nullptr;
    240          sub_field = sub_field->next) {
    241       if (sub_field->key == nullptr) return;
    242       if (strcmp(sub_field->key, "maxTokens") == 0) {
    243         if (max_milli_tokens != 0) return;  // Duplicate.
    244         if (sub_field->type != GRPC_JSON_NUMBER) return;
    245         max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
    246         if (max_milli_tokens == -1) return;
    247         max_milli_tokens *= 1000;
    248       } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
    249         if (milli_token_ratio != 0) return;  // Duplicate.
    250         if (sub_field->type != GRPC_JSON_NUMBER) return;
    251         // We support up to 3 decimal digits.
    252         size_t whole_len = strlen(sub_field->value);
    253         uint32_t multiplier = 1;
    254         uint32_t decimal_value = 0;
    255         const char* decimal_point = strchr(sub_field->value, '.');
    256         if (decimal_point != nullptr) {
    257           whole_len = static_cast<size_t>(decimal_point - sub_field->value);
    258           multiplier = 1000;
    259           size_t decimal_len = strlen(decimal_point + 1);
    260           if (decimal_len > 3) decimal_len = 3;
    261           if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
    262                                          &decimal_value)) {
    263             return;
    264           }
    265           uint32_t decimal_multiplier = 1;
    266           for (size_t i = 0; i < (3 - decimal_len); ++i) {
    267             decimal_multiplier *= 10;
    268           }
    269           decimal_value *= decimal_multiplier;
    270         }
    271         uint32_t whole_value;
    272         if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
    273                                        &whole_value)) {
    274           return;
    275         }
    276         milli_token_ratio =
    277             static_cast<int>((whole_value * multiplier) + decimal_value);
    278         if (milli_token_ratio <= 0) return;
    279       }
    280     }
    281     parsing_state->retry_throttle_data =
    282         grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
    283             parsing_state->server_name, max_milli_tokens, milli_token_ratio);
    284   }
    285 }
    286 
    287 // Invoked from the resolver NextLocked() callback when the resolver
    288 // is shutting down.
    289 static void on_resolver_shutdown_locked(channel_data* chand,
    290                                         grpc_error* error) {
    291   if (grpc_client_channel_trace.enabled()) {
    292     gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
    293   }
    294   if (chand->lb_policy != nullptr) {
    295     if (grpc_client_channel_trace.enabled()) {
    296       gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
    297               chand->lb_policy.get());
    298     }
    299     grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
    300                                      chand->interested_parties);
    301     chand->lb_policy.reset();
    302   }
    303   if (chand->resolver != nullptr) {
    304     // This should never happen; it can only be triggered by a resolver
    305     // implementation spotaneously deciding to report shutdown without
    306     // being orphaned.  This code is included just to be defensive.
    307     if (grpc_client_channel_trace.enabled()) {
    308       gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
    309               chand, chand->resolver.get());
    310     }
    311     chand->resolver.reset();
    312     set_channel_connectivity_state_locked(
    313         chand, GRPC_CHANNEL_SHUTDOWN,
    314         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
    315             "Resolver spontaneous shutdown", &error, 1),
    316         "resolver_spontaneous_shutdown");
    317   }
    318   grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
    319                              GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
    320                                  "Channel disconnected", &error, 1));
    321   GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
    322   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
    323   grpc_channel_args_destroy(chand->resolver_result);
    324   chand->resolver_result = nullptr;
    325   GRPC_ERROR_UNREF(error);
    326 }
    327 
    328 // Returns the LB policy name from the resolver result.
    329 static grpc_core::UniquePtr<char>
    330 get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
    331   // Find LB policy name in channel args.
    332   const grpc_arg* channel_arg =
    333       grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
    334   const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
    335   // Special case: If at least one balancer address is present, we use
    336   // the grpclb policy, regardless of what the resolver actually specified.
    337   channel_arg =
    338       grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
    339   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
    340     grpc_lb_addresses* addresses =
    341         static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
    342     if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
    343       if (lb_policy_name != nullptr &&
    344           gpr_stricmp(lb_policy_name, "grpclb") != 0) {
    345         gpr_log(GPR_INFO,
    346                 "resolver requested LB policy %s but provided at least one "
    347                 "balancer address -- forcing use of grpclb LB policy",
    348                 lb_policy_name);
    349       }
    350       lb_policy_name = "grpclb";
    351     }
    352   }
    353   // Use pick_first if nothing was specified and we didn't select grpclb
    354   // above.
    355   if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
    356   return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
    357 }
    358 
    359 static void request_reresolution_locked(void* arg, grpc_error* error) {
    360   reresolution_request_args* args =
    361       static_cast<reresolution_request_args*>(arg);
    362   channel_data* chand = args->chand;
    363   // If this invocation is for a stale LB policy, treat it as an LB shutdown
    364   // signal.
    365   if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
    366       chand->resolver == nullptr) {
    367     GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
    368     gpr_free(args);
    369     return;
    370   }
    371   if (grpc_client_channel_trace.enabled()) {
    372     gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
    373   }
    374   chand->resolver->RequestReresolutionLocked();
    375   // Give back the closure to the LB policy.
    376   chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
    377 }
    378 
    379 // Creates a new LB policy, replacing any previous one.
    380 // If the new policy is created successfully, sets *connectivity_state and
    381 // *connectivity_error to its initial connectivity state; otherwise,
    382 // leaves them unchanged.
    383 static void create_new_lb_policy_locked(
    384     channel_data* chand, char* lb_policy_name,
    385     grpc_connectivity_state* connectivity_state,
    386     grpc_error** connectivity_error) {
    387   grpc_core::LoadBalancingPolicy::Args lb_policy_args;
    388   lb_policy_args.combiner = chand->combiner;
    389   lb_policy_args.client_channel_factory = chand->client_channel_factory;
    390   lb_policy_args.args = chand->resolver_result;
    391   grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
    392       grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
    393           lb_policy_name, lb_policy_args);
    394   if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
    395     gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
    396   } else {
    397     if (grpc_client_channel_trace.enabled()) {
    398       gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
    399               lb_policy_name, new_lb_policy.get());
    400     }
    401     // Swap out the LB policy and update the fds in
    402     // chand->interested_parties.
    403     if (chand->lb_policy != nullptr) {
    404       if (grpc_client_channel_trace.enabled()) {
    405         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
    406                 chand->lb_policy.get());
    407       }
    408       grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
    409                                        chand->interested_parties);
    410       chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
    411     }
    412     chand->lb_policy = std::move(new_lb_policy);
    413     grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
    414                                      chand->interested_parties);
    415     // Set up re-resolution callback.
    416     reresolution_request_args* args =
    417         static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
    418     args->chand = chand;
    419     args->lb_policy = chand->lb_policy.get();
    420     GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
    421                       grpc_combiner_scheduler(chand->combiner));
    422     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
    423     chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
    424     // Get the new LB policy's initial connectivity state and start a
    425     // connectivity watch.
    426     GRPC_ERROR_UNREF(*connectivity_error);
    427     *connectivity_state =
    428         chand->lb_policy->CheckConnectivityLocked(connectivity_error);
    429     if (chand->exit_idle_when_lb_policy_arrives) {
    430       chand->lb_policy->ExitIdleLocked();
    431       chand->exit_idle_when_lb_policy_arrives = false;
    432     }
    433     watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
    434   }
    435 }
    436 
    437 // Returns the service config (as a JSON string) from the resolver result.
    438 // Also updates state in chand.
    439 static grpc_core::UniquePtr<char>
    440 get_service_config_from_resolver_result_locked(channel_data* chand) {
    441   const grpc_arg* channel_arg =
    442       grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
    443   const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
    444   if (service_config_json != nullptr) {
    445     if (grpc_client_channel_trace.enabled()) {
    446       gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
    447               chand, service_config_json);
    448     }
    449     grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
    450         grpc_core::ServiceConfig::Create(service_config_json);
    451     if (service_config != nullptr) {
    452       if (chand->enable_retries) {
    453         channel_arg =
    454             grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
    455         const char* server_uri = grpc_channel_arg_get_string(channel_arg);
    456         GPR_ASSERT(server_uri != nullptr);
    457         grpc_uri* uri = grpc_uri_parse(server_uri, true);
    458         GPR_ASSERT(uri->path[0] != '\0');
    459         service_config_parsing_state parsing_state;
    460         parsing_state.server_name =
    461             uri->path[0] == '/' ? uri->path + 1 : uri->path;
    462         service_config->ParseGlobalParams(parse_retry_throttle_params,
    463                                           &parsing_state);
    464         grpc_uri_destroy(uri);
    465         chand->retry_throttle_data =
    466             std::move(parsing_state.retry_throttle_data);
    467       }
    468       chand->method_params_table = service_config->CreateMethodConfigTable(
    469           ClientChannelMethodParams::CreateFromJson);
    470     }
    471   }
    472   return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
    473 }
    474 
    475 // Callback invoked when a resolver result is available.
    476 static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
    477   channel_data* chand = static_cast<channel_data*>(arg);
    478   if (grpc_client_channel_trace.enabled()) {
    479     const char* disposition =
    480         chand->resolver_result != nullptr
    481             ? ""
    482             : (error == GRPC_ERROR_NONE ? " (transient error)"
    483                                         : " (resolver shutdown)");
    484     gpr_log(GPR_INFO,
    485             "chand=%p: got resolver result: resolver_result=%p error=%s%s",
    486             chand, chand->resolver_result, grpc_error_string(error),
    487             disposition);
    488   }
    489   // Handle shutdown.
    490   if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
    491     on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
    492     return;
    493   }
    494   // Data used to set the channel's connectivity state.
    495   bool set_connectivity_state = true;
    496   grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
    497   grpc_error* connectivity_error =
    498       GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
    499   // chand->resolver_result will be null in the case of a transient
    500   // resolution error.  In that case, we don't have any new result to
    501   // process, which means that we keep using the previous result (if any).
    502   if (chand->resolver_result == nullptr) {
    503     if (grpc_client_channel_trace.enabled()) {
    504       gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
    505     }
    506   } else {
    507     grpc_core::UniquePtr<char> lb_policy_name =
    508         get_lb_policy_name_from_resolver_result_locked(chand);
    509     // Check to see if we're already using the right LB policy.
    510     // Note: It's safe to use chand->info_lb_policy_name here without
    511     // taking a lock on chand->info_mu, because this function is the
    512     // only thing that modifies its value, and it can only be invoked
    513     // once at any given time.
    514     bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
    515                                   gpr_stricmp(chand->info_lb_policy_name.get(),
    516                                               lb_policy_name.get()) != 0;
    517     if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
    518       // Continue using the same LB policy.  Update with new addresses.
    519       if (grpc_client_channel_trace.enabled()) {
    520         gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
    521                 chand, lb_policy_name.get(), chand->lb_policy.get());
    522       }
    523       chand->lb_policy->UpdateLocked(*chand->resolver_result);
    524       // No need to set the channel's connectivity state; the existing
    525       // watch on the LB policy will take care of that.
    526       set_connectivity_state = false;
    527     } else {
    528       // Instantiate new LB policy.
    529       create_new_lb_policy_locked(chand, lb_policy_name.get(),
    530                                   &connectivity_state, &connectivity_error);
    531     }
    532     // Find service config.
    533     grpc_core::UniquePtr<char> service_config_json =
    534         get_service_config_from_resolver_result_locked(chand);
    535     // Swap out the data used by cc_get_channel_info().
    536     gpr_mu_lock(&chand->info_mu);
    537     chand->info_lb_policy_name = std::move(lb_policy_name);
    538     chand->info_service_config_json = std::move(service_config_json);
    539     gpr_mu_unlock(&chand->info_mu);
    540     // Clean up.
    541     grpc_channel_args_destroy(chand->resolver_result);
    542     chand->resolver_result = nullptr;
    543   }
    544   // Set the channel's connectivity state if needed.
    545   if (set_connectivity_state) {
    546     set_channel_connectivity_state_locked(
    547         chand, connectivity_state, connectivity_error, "resolver_result");
    548   } else {
    549     GRPC_ERROR_UNREF(connectivity_error);
    550   }
    551   // Invoke closures that were waiting for results and renew the watch.
    552   GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
    553   chand->resolver->NextLocked(&chand->resolver_result,
    554                               &chand->on_resolver_result_changed);
    555 }
    556 
    557 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
    558   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
    559   grpc_channel_element* elem =
    560       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
    561   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    562 
    563   if (op->on_connectivity_state_change != nullptr) {
    564     grpc_connectivity_state_notify_on_state_change(
    565         &chand->state_tracker, op->connectivity_state,
    566         op->on_connectivity_state_change);
    567     op->on_connectivity_state_change = nullptr;
    568     op->connectivity_state = nullptr;
    569   }
    570 
    571   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
    572     if (chand->lb_policy == nullptr) {
    573       grpc_error* error =
    574           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
    575       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
    576       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
    577     } else {
    578       grpc_error* error = GRPC_ERROR_NONE;
    579       grpc_core::LoadBalancingPolicy::PickState pick_state;
    580       pick_state.initial_metadata = nullptr;
    581       pick_state.initial_metadata_flags = 0;
    582       pick_state.on_complete = nullptr;
    583       memset(&pick_state.subchannel_call_context, 0,
    584              sizeof(pick_state.subchannel_call_context));
    585       pick_state.user_data = nullptr;
    586       // Pick must return synchronously, because pick_state.on_complete is null.
    587       GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
    588       if (pick_state.connected_subchannel != nullptr) {
    589         pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
    590                                               op->send_ping.on_ack);
    591       } else {
    592         if (error == GRPC_ERROR_NONE) {
    593           error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
    594               "LB policy dropped call on ping");
    595         }
    596         GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
    597         GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
    598       }
    599       op->bind_pollset = nullptr;
    600     }
    601     op->send_ping.on_initiate = nullptr;
    602     op->send_ping.on_ack = nullptr;
    603   }
    604 
    605   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
    606     if (chand->resolver != nullptr) {
    607       set_channel_connectivity_state_locked(
    608           chand, GRPC_CHANNEL_SHUTDOWN,
    609           GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
    610       chand->resolver.reset();
    611       if (!chand->started_resolving) {
    612         grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
    613                                    GRPC_ERROR_REF(op->disconnect_with_error));
    614         GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
    615       }
    616       if (chand->lb_policy != nullptr) {
    617         grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
    618                                          chand->interested_parties);
    619         chand->lb_policy.reset();
    620       }
    621     }
    622     GRPC_ERROR_UNREF(op->disconnect_with_error);
    623   }
    624 
    625   if (op->reset_connect_backoff) {
    626     if (chand->resolver != nullptr) {
    627       chand->resolver->ResetBackoffLocked();
    628       chand->resolver->RequestReresolutionLocked();
    629     }
    630     if (chand->lb_policy != nullptr) {
    631       chand->lb_policy->ResetBackoffLocked();
    632     }
    633   }
    634 
    635   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
    636 
    637   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
    638 }
    639 
    640 static void cc_start_transport_op(grpc_channel_element* elem,
    641                                   grpc_transport_op* op) {
    642   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    643 
    644   GPR_ASSERT(op->set_accept_stream == false);
    645   if (op->bind_pollset != nullptr) {
    646     grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
    647   }
    648 
    649   op->handler_private.extra_arg = elem;
    650   GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
    651   GRPC_CLOSURE_SCHED(
    652       GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
    653                         op, grpc_combiner_scheduler(chand->combiner)),
    654       GRPC_ERROR_NONE);
    655 }
    656 
    657 static void cc_get_channel_info(grpc_channel_element* elem,
    658                                 const grpc_channel_info* info) {
    659   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    660   gpr_mu_lock(&chand->info_mu);
    661   if (info->lb_policy_name != nullptr) {
    662     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
    663   }
    664   if (info->service_config_json != nullptr) {
    665     *info->service_config_json =
    666         gpr_strdup(chand->info_service_config_json.get());
    667   }
    668   gpr_mu_unlock(&chand->info_mu);
    669 }
    670 
    671 /* Constructor for channel_data */
    672 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
    673                                         grpc_channel_element_args* args) {
    674   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    675   GPR_ASSERT(args->is_last);
    676   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
    677   // Initialize data members.
    678   chand->combiner = grpc_combiner_create();
    679   gpr_mu_init(&chand->info_mu);
    680   gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
    681 
    682   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
    683   chand->external_connectivity_watcher_list_head = nullptr;
    684   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
    685 
    686   chand->owning_stack = args->channel_stack;
    687   GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
    688                     on_resolver_result_changed_locked, chand,
    689                     grpc_combiner_scheduler(chand->combiner));
    690   chand->interested_parties = grpc_pollset_set_create();
    691   grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
    692                                "client_channel");
    693   grpc_client_channel_start_backup_polling(chand->interested_parties);
    694   // Record max per-RPC retry buffer size.
    695   const grpc_arg* arg = grpc_channel_args_find(
    696       args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
    697   chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
    698       arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
    699   // Record enable_retries.
    700   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
    701   chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
    702   // Record client channel factory.
    703   arg = grpc_channel_args_find(args->channel_args,
    704                                GRPC_ARG_CLIENT_CHANNEL_FACTORY);
    705   if (arg == nullptr) {
    706     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
    707         "Missing client channel factory in args for client channel filter");
    708   }
    709   if (arg->type != GRPC_ARG_POINTER) {
    710     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
    711         "client channel factory arg must be a pointer");
    712   }
    713   grpc_client_channel_factory_ref(
    714       static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
    715   chand->client_channel_factory =
    716       static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
    717   // Get server name to resolve, using proxy mapper if needed.
    718   arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
    719   if (arg == nullptr) {
    720     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
    721         "Missing server uri in args for client channel filter");
    722   }
    723   if (arg->type != GRPC_ARG_STRING) {
    724     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
    725         "server uri arg must be a string");
    726   }
    727   char* proxy_name = nullptr;
    728   grpc_channel_args* new_args = nullptr;
    729   grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
    730                               &proxy_name, &new_args);
    731   // Instantiate resolver.
    732   chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
    733       proxy_name != nullptr ? proxy_name : arg->value.string,
    734       new_args != nullptr ? new_args : args->channel_args,
    735       chand->interested_parties, chand->combiner);
    736   if (proxy_name != nullptr) gpr_free(proxy_name);
    737   if (new_args != nullptr) grpc_channel_args_destroy(new_args);
    738   if (chand->resolver == nullptr) {
    739     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
    740   }
    741   chand->deadline_checking_enabled =
    742       grpc_deadline_checking_enabled(args->channel_args);
    743   return GRPC_ERROR_NONE;
    744 }
    745 
    746 /* Destructor for channel_data */
    747 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
    748   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    749   if (chand->resolver != nullptr) {
    750     // The only way we can get here is if we never started resolving,
    751     // because we take a ref to the channel stack when we start
    752     // resolving and do not release it until the resolver callback is
    753     // invoked after the resolver shuts down.
    754     chand->resolver.reset();
    755   }
    756   if (chand->client_channel_factory != nullptr) {
    757     grpc_client_channel_factory_unref(chand->client_channel_factory);
    758   }
    759   if (chand->lb_policy != nullptr) {
    760     grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
    761                                      chand->interested_parties);
    762     chand->lb_policy.reset();
    763   }
    764   // TODO(roth): Once we convert the filter API to C++, there will no
    765   // longer be any need to explicitly reset these smart pointer data members.
    766   chand->info_lb_policy_name.reset();
    767   chand->info_service_config_json.reset();
    768   chand->retry_throttle_data.reset();
    769   chand->method_params_table.reset();
    770   grpc_client_channel_stop_backup_polling(chand->interested_parties);
    771   grpc_connectivity_state_destroy(&chand->state_tracker);
    772   grpc_pollset_set_destroy(chand->interested_parties);
    773   GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
    774   gpr_mu_destroy(&chand->info_mu);
    775   gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
    776 }
    777 
    778 /*************************************************************************
    779  * PER-CALL FUNCTIONS
    780  */
    781 
    782 // Max number of batches that can be pending on a call at any given
    783 // time.  This includes one batch for each of the following ops:
    784 //   recv_initial_metadata
    785 //   send_initial_metadata
    786 //   recv_message
    787 //   send_message
    788 //   recv_trailing_metadata
    789 //   send_trailing_metadata
    790 #define MAX_PENDING_BATCHES 6
    791 
    792 // Retry support:
    793 //
    794 // In order to support retries, we act as a proxy for stream op batches.
    795 // When we get a batch from the surface, we add it to our list of pending
    796 // batches, and we then use those batches to construct separate "child"
    797 // batches to be started on the subchannel call.  When the child batches
    798 // return, we then decide which pending batches have been completed and
    799 // schedule their callbacks accordingly.  If a subchannel call fails and
    800 // we want to retry it, we do a new pick and start again, constructing
    801 // new "child" batches for the new subchannel call.
    802 //
    803 // Note that retries are committed when receiving data from the server
    804 // (except for Trailers-Only responses).  However, there may be many
    805 // send ops started before receiving any data, so we may have already
    806 // completed some number of send ops (and returned the completions up to
    807 // the surface) by the time we realize that we need to retry.  To deal
    808 // with this, we cache data for send ops, so that we can replay them on a
    809 // different subchannel call even after we have completed the original
    810 // batches.
    811 //
    812 // There are two sets of data to maintain:
    813 // - In call_data (in the parent channel), we maintain a list of pending
    814 //   ops and cached data for send ops.
    815 // - In the subchannel call, we maintain state to indicate what ops have
    816 //   already been sent down to that call.
    817 //
    818 // When constructing the "child" batches, we compare those two sets of
    819 // data to see which batches need to be sent to the subchannel call.
    820 
    821 // TODO(roth): In subsequent PRs:
    822 // - add support for transparent retries (including initial metadata)
    823 // - figure out how to record stats in census for retries
    824 //   (census filter is on top of this one)
    825 // - add census stats for retries
    826 
    827 // State used for starting a retryable batch on a subchannel call.
    828 // This provides its own grpc_transport_stream_op_batch and other data
    829 // structures needed to populate the ops in the batch.
    830 // We allocate one struct on the arena for each attempt at starting a
    831 // batch on a given subchannel call.
    832 typedef struct {
    833   gpr_refcount refs;
    834   grpc_call_element* elem;
    835   grpc_subchannel_call* subchannel_call;  // Holds a ref.
    836   // The batch to use in the subchannel call.
    837   // Its payload field points to subchannel_call_retry_state.batch_payload.
    838   grpc_transport_stream_op_batch batch;
    839   // For intercepting on_complete.
    840   grpc_closure on_complete;
    841 } subchannel_batch_data;
    842 
    843 // Retry state associated with a subchannel call.
    844 // Stored in the parent_data of the subchannel call object.
    845 typedef struct {
    846   // subchannel_batch_data.batch.payload points to this.
    847   grpc_transport_stream_op_batch_payload batch_payload;
    848   // For send_initial_metadata.
    849   // Note that we need to make a copy of the initial metadata for each
    850   // subchannel call instead of just referring to the copy in call_data,
    851   // because filters in the subchannel stack will probably add entries,
    852   // so we need to start in a pristine state for each attempt of the call.
    853   grpc_linked_mdelem* send_initial_metadata_storage;
    854   grpc_metadata_batch send_initial_metadata;
    855   // For send_message.
    856   grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
    857       send_message;
    858   // For send_trailing_metadata.
    859   grpc_linked_mdelem* send_trailing_metadata_storage;
    860   grpc_metadata_batch send_trailing_metadata;
    861   // For intercepting recv_initial_metadata.
    862   grpc_metadata_batch recv_initial_metadata;
    863   grpc_closure recv_initial_metadata_ready;
    864   bool trailing_metadata_available;
    865   // For intercepting recv_message.
    866   grpc_closure recv_message_ready;
    867   grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
    868   // For intercepting recv_trailing_metadata.
    869   grpc_metadata_batch recv_trailing_metadata;
    870   grpc_transport_stream_stats collect_stats;
    871   grpc_closure recv_trailing_metadata_ready;
    872   // These fields indicate which ops have been started and completed on
    873   // this subchannel call.
    874   size_t started_send_message_count;
    875   size_t completed_send_message_count;
    876   size_t started_recv_message_count;
    877   size_t completed_recv_message_count;
    878   bool started_send_initial_metadata : 1;
    879   bool completed_send_initial_metadata : 1;
    880   bool started_send_trailing_metadata : 1;
    881   bool completed_send_trailing_metadata : 1;
    882   bool started_recv_initial_metadata : 1;
    883   bool completed_recv_initial_metadata : 1;
    884   bool started_recv_trailing_metadata : 1;
    885   bool completed_recv_trailing_metadata : 1;
    886   // State for callback processing.
    887   bool retry_dispatched : 1;
    888   subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
    889   grpc_error* recv_initial_metadata_error;
    890   subchannel_batch_data* recv_message_ready_deferred_batch;
    891   grpc_error* recv_message_error;
    892   subchannel_batch_data* recv_trailing_metadata_internal_batch;
    893 } subchannel_call_retry_state;
    894 
    895 // Pending batches stored in call data.
    896 typedef struct {
    897   // The pending batch.  If nullptr, this slot is empty.
    898   grpc_transport_stream_op_batch* batch;
    899   // Indicates whether payload for send ops has been cached in call data.
    900   bool send_ops_cached;
    901 } pending_batch;
    902 
    903 /** Call data.  Holds a pointer to grpc_subchannel_call and the
    904     associated machinery to create such a pointer.
    905     Handles queueing of stream ops until a call object is ready, waiting
    906     for initial metadata before trying to create a call object,
    907     and handling cancellation gracefully. */
    908 typedef struct client_channel_call_data {
    909   // State for handling deadlines.
    910   // The code in deadline_filter.c requires this to be the first field.
    911   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
    912   // and this struct both independently store pointers to the call stack
    913   // and call combiner.  If/when we have time, find a way to avoid this
    914   // without breaking the grpc_deadline_state abstraction.
    915   grpc_deadline_state deadline_state;
    916 
    917   grpc_slice path;  // Request path.
    918   gpr_timespec call_start_time;
    919   grpc_millis deadline;
    920   gpr_arena* arena;
    921   grpc_call_stack* owning_call;
    922   grpc_call_combiner* call_combiner;
    923 
    924   grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
    925   grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
    926 
    927   grpc_subchannel_call* subchannel_call;
    928 
    929   // Set when we get a cancel_stream op.
    930   grpc_error* cancel_error;
    931 
    932   grpc_core::LoadBalancingPolicy::PickState pick;
    933   grpc_closure pick_closure;
    934   grpc_closure pick_cancel_closure;
    935 
    936   // state needed to support channelz interception of recv trailing metadata.
    937   grpc_closure recv_trailing_metadata_ready_channelz;
    938   grpc_closure* original_recv_trailing_metadata;
    939   grpc_metadata_batch* recv_trailing_metadata;
    940 
    941   grpc_polling_entity* pollent;
    942   bool pollent_added_to_interested_parties;
    943 
    944   // Batches are added to this list when received from above.
    945   // They are removed when we are done handling the batch (i.e., when
    946   // either we have invoked all of the batch's callbacks or we have
    947   // passed the batch down to the subchannel call and are not
    948   // intercepting any of its callbacks).
    949   pending_batch pending_batches[MAX_PENDING_BATCHES];
    950   bool pending_send_initial_metadata : 1;
    951   bool pending_send_message : 1;
    952   bool pending_send_trailing_metadata : 1;
    953 
    954   // Retry state.
    955   bool enable_retries : 1;
    956   bool retry_committed : 1;
    957   bool last_attempt_got_server_pushback : 1;
    958   int num_attempts_completed;
    959   size_t bytes_buffered_for_retry;
    960   grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
    961   grpc_timer retry_timer;
    962 
    963   // The number of pending retriable subchannel batches containing send ops.
    964   // We hold a ref to the call stack while this is non-zero, since replay
    965   // batches may not complete until after all callbacks have been returned
    966   // to the surface, and we need to make sure that the call is not destroyed
    967   // until all of these batches have completed.
    968   // Note that we actually only need to track replay batches, but it's
    969   // easier to track all batches with send ops.
    970   int num_pending_retriable_subchannel_send_batches;
    971 
    972   // Cached data for retrying send ops.
    973   // send_initial_metadata
    974   bool seen_send_initial_metadata;
    975   grpc_linked_mdelem* send_initial_metadata_storage;
    976   grpc_metadata_batch send_initial_metadata;
    977   uint32_t send_initial_metadata_flags;
    978   gpr_atm* peer_string;
    979   // send_message
    980   // When we get a send_message op, we replace the original byte stream
    981   // with a CachingByteStream that caches the slices to a local buffer for
    982   // use in retries.
    983   // Note: We inline the cache for the first 3 send_message ops and use
    984   // dynamic allocation after that.  This number was essentially picked
    985   // at random; it could be changed in the future to tune performance.
    986   grpc_core::ManualConstructor<
    987       grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
    988       send_messages;
    989   // send_trailing_metadata
    990   bool seen_send_trailing_metadata;
    991   grpc_linked_mdelem* send_trailing_metadata_storage;
    992   grpc_metadata_batch send_trailing_metadata;
    993 } call_data;
    994 
    995 // Forward declarations.
    996 static void retry_commit(grpc_call_element* elem,
    997                          subchannel_call_retry_state* retry_state);
    998 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
    999 static void on_complete(void* arg, grpc_error* error);
   1000 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
   1001 static void start_pick_locked(void* arg, grpc_error* ignored);
   1002 static void maybe_intercept_recv_trailing_metadata_for_channelz(
   1003     grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
   1004 
   1005 //
   1006 // send op data caching
   1007 //
   1008 
   1009 // Caches data for send ops so that it can be retried later, if not
   1010 // already cached.
   1011 static void maybe_cache_send_ops_for_batch(call_data* calld,
   1012                                            pending_batch* pending) {
   1013   if (pending->send_ops_cached) return;
   1014   pending->send_ops_cached = true;
   1015   grpc_transport_stream_op_batch* batch = pending->batch;
   1016   // Save a copy of metadata for send_initial_metadata ops.
   1017   if (batch->send_initial_metadata) {
   1018     calld->seen_send_initial_metadata = true;
   1019     GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
   1020     grpc_metadata_batch* send_initial_metadata =
   1021         batch->payload->send_initial_metadata.send_initial_metadata;
   1022     calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
   1023         calld->arena,
   1024         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
   1025     grpc_metadata_batch_copy(send_initial_metadata,
   1026                              &calld->send_initial_metadata,
   1027                              calld->send_initial_metadata_storage);
   1028     calld->send_initial_metadata_flags =
   1029         batch->payload->send_initial_metadata.send_initial_metadata_flags;
   1030     calld->peer_string = batch->payload->send_initial_metadata.peer_string;
   1031   }
   1032   // Set up cache for send_message ops.
   1033   if (batch->send_message) {
   1034     grpc_core::ByteStreamCache* cache =
   1035         static_cast<grpc_core::ByteStreamCache*>(
   1036             gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
   1037     new (cache) grpc_core::ByteStreamCache(
   1038         std::move(batch->payload->send_message.send_message));
   1039     calld->send_messages->push_back(cache);
   1040   }
   1041   // Save metadata batch for send_trailing_metadata ops.
   1042   if (batch->send_trailing_metadata) {
   1043     calld->seen_send_trailing_metadata = true;
   1044     GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
   1045     grpc_metadata_batch* send_trailing_metadata =
   1046         batch->payload->send_trailing_metadata.send_trailing_metadata;
   1047     calld->send_trailing_metadata_storage =
   1048         (grpc_linked_mdelem*)gpr_arena_alloc(
   1049             calld->arena,
   1050             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
   1051     grpc_metadata_batch_copy(send_trailing_metadata,
   1052                              &calld->send_trailing_metadata,
   1053                              calld->send_trailing_metadata_storage);
   1054   }
   1055 }
   1056 
   1057 // Frees cached send_initial_metadata.
   1058 static void free_cached_send_initial_metadata(channel_data* chand,
   1059                                               call_data* calld) {
   1060   if (grpc_client_channel_trace.enabled()) {
   1061     gpr_log(GPR_INFO,
   1062             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
   1063             calld);
   1064   }
   1065   grpc_metadata_batch_destroy(&calld->send_initial_metadata);
   1066 }
   1067 
   1068 // Frees cached send_message at index idx.
   1069 static void free_cached_send_message(channel_data* chand, call_data* calld,
   1070                                      size_t idx) {
   1071   if (grpc_client_channel_trace.enabled()) {
   1072     gpr_log(GPR_INFO,
   1073             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
   1074             chand, calld, idx);
   1075   }
   1076   (*calld->send_messages)[idx]->Destroy();
   1077 }
   1078 
   1079 // Frees cached send_trailing_metadata.
   1080 static void free_cached_send_trailing_metadata(channel_data* chand,
   1081                                                call_data* calld) {
   1082   if (grpc_client_channel_trace.enabled()) {
   1083     gpr_log(GPR_INFO,
   1084             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
   1085             chand, calld);
   1086   }
   1087   grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
   1088 }
   1089 
   1090 // Frees cached send ops that have already been completed after
   1091 // committing the call.
   1092 static void free_cached_send_op_data_after_commit(
   1093     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
   1094   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1095   call_data* calld = static_cast<call_data*>(elem->call_data);
   1096   if (retry_state->completed_send_initial_metadata) {
   1097     free_cached_send_initial_metadata(chand, calld);
   1098   }
   1099   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
   1100     free_cached_send_message(chand, calld, i);
   1101   }
   1102   if (retry_state->completed_send_trailing_metadata) {
   1103     free_cached_send_trailing_metadata(chand, calld);
   1104   }
   1105 }
   1106 
   1107 // Frees cached send ops that were completed by the completed batch in
   1108 // batch_data.  Used when batches are completed after the call is committed.
   1109 static void free_cached_send_op_data_for_completed_batch(
   1110     grpc_call_element* elem, subchannel_batch_data* batch_data,
   1111     subchannel_call_retry_state* retry_state) {
   1112   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1113   call_data* calld = static_cast<call_data*>(elem->call_data);
   1114   if (batch_data->batch.send_initial_metadata) {
   1115     free_cached_send_initial_metadata(chand, calld);
   1116   }
   1117   if (batch_data->batch.send_message) {
   1118     free_cached_send_message(chand, calld,
   1119                              retry_state->completed_send_message_count - 1);
   1120   }
   1121   if (batch_data->batch.send_trailing_metadata) {
   1122     free_cached_send_trailing_metadata(chand, calld);
   1123   }
   1124 }
   1125 
   1126 //
   1127 // pending_batches management
   1128 //
   1129 
   1130 // Returns the index into calld->pending_batches to be used for batch.
   1131 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
   1132   // Note: It is important the send_initial_metadata be the first entry
   1133   // here, since the code in pick_subchannel_locked() assumes it will be.
   1134   if (batch->send_initial_metadata) return 0;
   1135   if (batch->send_message) return 1;
   1136   if (batch->send_trailing_metadata) return 2;
   1137   if (batch->recv_initial_metadata) return 3;
   1138   if (batch->recv_message) return 4;
   1139   if (batch->recv_trailing_metadata) return 5;
   1140   GPR_UNREACHABLE_CODE(return (size_t)-1);
   1141 }
   1142 
   1143 // This is called via the call combiner, so access to calld is synchronized.
   1144 static void pending_batches_add(grpc_call_element* elem,
   1145                                 grpc_transport_stream_op_batch* batch) {
   1146   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1147   call_data* calld = static_cast<call_data*>(elem->call_data);
   1148   const size_t idx = get_batch_index(batch);
   1149   if (grpc_client_channel_trace.enabled()) {
   1150     gpr_log(GPR_INFO,
   1151             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
   1152             calld, idx);
   1153   }
   1154   pending_batch* pending = &calld->pending_batches[idx];
   1155   GPR_ASSERT(pending->batch == nullptr);
   1156   pending->batch = batch;
   1157   pending->send_ops_cached = false;
   1158   if (calld->enable_retries) {
   1159     // Update state in calld about pending batches.
   1160     // Also check if the batch takes us over the retry buffer limit.
   1161     // Note: We don't check the size of trailing metadata here, because
   1162     // gRPC clients do not send trailing metadata.
   1163     if (batch->send_initial_metadata) {
   1164       calld->pending_send_initial_metadata = true;
   1165       calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
   1166           batch->payload->send_initial_metadata.send_initial_metadata);
   1167     }
   1168     if (batch->send_message) {
   1169       calld->pending_send_message = true;
   1170       calld->bytes_buffered_for_retry +=
   1171           batch->payload->send_message.send_message->length();
   1172     }
   1173     if (batch->send_trailing_metadata) {
   1174       calld->pending_send_trailing_metadata = true;
   1175     }
   1176     if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
   1177                      chand->per_rpc_retry_buffer_size)) {
   1178       if (grpc_client_channel_trace.enabled()) {
   1179         gpr_log(GPR_INFO,
   1180                 "chand=%p calld=%p: exceeded retry buffer size, committing",
   1181                 chand, calld);
   1182       }
   1183       subchannel_call_retry_state* retry_state =
   1184           calld->subchannel_call == nullptr
   1185               ? nullptr
   1186               : static_cast<subchannel_call_retry_state*>(
   1187                     grpc_connected_subchannel_call_get_parent_data(
   1188                         calld->subchannel_call));
   1189       retry_commit(elem, retry_state);
   1190       // If we are not going to retry and have not yet started, pretend
   1191       // retries are disabled so that we don't bother with retry overhead.
   1192       if (calld->num_attempts_completed == 0) {
   1193         if (grpc_client_channel_trace.enabled()) {
   1194           gpr_log(GPR_INFO,
   1195                   "chand=%p calld=%p: disabling retries before first attempt",
   1196                   chand, calld);
   1197         }
   1198         calld->enable_retries = false;
   1199       }
   1200     }
   1201   }
   1202 }
   1203 
   1204 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
   1205   if (calld->enable_retries) {
   1206     if (pending->batch->send_initial_metadata) {
   1207       calld->pending_send_initial_metadata = false;
   1208     }
   1209     if (pending->batch->send_message) {
   1210       calld->pending_send_message = false;
   1211     }
   1212     if (pending->batch->send_trailing_metadata) {
   1213       calld->pending_send_trailing_metadata = false;
   1214     }
   1215   }
   1216   pending->batch = nullptr;
   1217 }
   1218 
   1219 // This is called via the call combiner, so access to calld is synchronized.
   1220 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
   1221   grpc_transport_stream_op_batch* batch =
   1222       static_cast<grpc_transport_stream_op_batch*>(arg);
   1223   call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
   1224   // Note: This will release the call combiner.
   1225   grpc_transport_stream_op_batch_finish_with_failure(
   1226       batch, GRPC_ERROR_REF(error), calld->call_combiner);
   1227 }
   1228 
   1229 // This is called via the call combiner, so access to calld is synchronized.
   1230 // If yield_call_combiner is true, assumes responsibility for yielding
   1231 // the call combiner.
   1232 static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
   1233                                  bool yield_call_combiner) {
   1234   GPR_ASSERT(error != GRPC_ERROR_NONE);
   1235   call_data* calld = static_cast<call_data*>(elem->call_data);
   1236   if (grpc_client_channel_trace.enabled()) {
   1237     size_t num_batches = 0;
   1238     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   1239       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
   1240     }
   1241     gpr_log(GPR_INFO,
   1242             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
   1243             elem->channel_data, calld, num_batches, grpc_error_string(error));
   1244   }
   1245   grpc_core::CallCombinerClosureList closures;
   1246   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   1247     pending_batch* pending = &calld->pending_batches[i];
   1248     grpc_transport_stream_op_batch* batch = pending->batch;
   1249     if (batch != nullptr) {
   1250       batch->handler_private.extra_arg = calld;
   1251       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
   1252                         fail_pending_batch_in_call_combiner, batch,
   1253                         grpc_schedule_on_exec_ctx);
   1254       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
   1255                    "pending_batches_fail");
   1256       pending_batch_clear(calld, pending);
   1257     }
   1258   }
   1259   if (yield_call_combiner) {
   1260     closures.RunClosures(calld->call_combiner);
   1261   } else {
   1262     closures.RunClosuresWithoutYielding(calld->call_combiner);
   1263   }
   1264   GRPC_ERROR_UNREF(error);
   1265 }
   1266 
   1267 // This is called via the call combiner, so access to calld is synchronized.
   1268 static void resume_pending_batch_in_call_combiner(void* arg,
   1269                                                   grpc_error* ignored) {
   1270   grpc_transport_stream_op_batch* batch =
   1271       static_cast<grpc_transport_stream_op_batch*>(arg);
   1272   grpc_subchannel_call* subchannel_call =
   1273       static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
   1274   // Note: This will release the call combiner.
   1275   grpc_subchannel_call_process_op(subchannel_call, batch);
   1276 }
   1277 
   1278 // This is called via the call combiner, so access to calld is synchronized.
   1279 static void pending_batches_resume(grpc_call_element* elem) {
   1280   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1281   call_data* calld = static_cast<call_data*>(elem->call_data);
   1282   if (calld->enable_retries) {
   1283     start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
   1284     return;
   1285   }
   1286   // Retries not enabled; send down batches as-is.
   1287   if (grpc_client_channel_trace.enabled()) {
   1288     size_t num_batches = 0;
   1289     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   1290       if (calld->pending_batches[i].batch != nullptr) ++num_batches;
   1291     }
   1292     gpr_log(GPR_INFO,
   1293             "chand=%p calld=%p: starting %" PRIuPTR
   1294             " pending batches on subchannel_call=%p",
   1295             chand, calld, num_batches, calld->subchannel_call);
   1296   }
   1297   grpc_core::CallCombinerClosureList closures;
   1298   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   1299     pending_batch* pending = &calld->pending_batches[i];
   1300     grpc_transport_stream_op_batch* batch = pending->batch;
   1301     if (batch != nullptr) {
   1302       maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch);
   1303       batch->handler_private.extra_arg = calld->subchannel_call;
   1304       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
   1305                         resume_pending_batch_in_call_combiner, batch,
   1306                         grpc_schedule_on_exec_ctx);
   1307       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
   1308                    "pending_batches_resume");
   1309       pending_batch_clear(calld, pending);
   1310     }
   1311   }
   1312   // Note: This will release the call combiner.
   1313   closures.RunClosures(calld->call_combiner);
   1314 }
   1315 
   1316 static void maybe_clear_pending_batch(grpc_call_element* elem,
   1317                                       pending_batch* pending) {
   1318   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1319   call_data* calld = static_cast<call_data*>(elem->call_data);
   1320   grpc_transport_stream_op_batch* batch = pending->batch;
   1321   // We clear the pending batch if all of its callbacks have been
   1322   // scheduled and reset to nullptr.
   1323   if (batch->on_complete == nullptr &&
   1324       (!batch->recv_initial_metadata ||
   1325        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
   1326            nullptr) &&
   1327       (!batch->recv_message ||
   1328        batch->payload->recv_message.recv_message_ready == nullptr) &&
   1329       (!batch->recv_trailing_metadata ||
   1330        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
   1331            nullptr)) {
   1332     if (grpc_client_channel_trace.enabled()) {
   1333       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
   1334               calld);
   1335     }
   1336     pending_batch_clear(calld, pending);
   1337   }
   1338 }
   1339 
   1340 // Returns a pointer to the first pending batch for which predicate(batch)
   1341 // returns true, or null if not found.
   1342 template <typename Predicate>
   1343 static pending_batch* pending_batch_find(grpc_call_element* elem,
   1344                                          const char* log_message,
   1345                                          Predicate predicate) {
   1346   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1347   call_data* calld = static_cast<call_data*>(elem->call_data);
   1348   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   1349     pending_batch* pending = &calld->pending_batches[i];
   1350     grpc_transport_stream_op_batch* batch = pending->batch;
   1351     if (batch != nullptr && predicate(batch)) {
   1352       if (grpc_client_channel_trace.enabled()) {
   1353         gpr_log(GPR_INFO,
   1354                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
   1355                 calld, log_message, i);
   1356       }
   1357       return pending;
   1358     }
   1359   }
   1360   return nullptr;
   1361 }
   1362 
   1363 //
   1364 // retry code
   1365 //
   1366 
   1367 // Commits the call so that no further retry attempts will be performed.
   1368 static void retry_commit(grpc_call_element* elem,
   1369                          subchannel_call_retry_state* retry_state) {
   1370   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1371   call_data* calld = static_cast<call_data*>(elem->call_data);
   1372   if (calld->retry_committed) return;
   1373   calld->retry_committed = true;
   1374   if (grpc_client_channel_trace.enabled()) {
   1375     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
   1376   }
   1377   if (retry_state != nullptr) {
   1378     free_cached_send_op_data_after_commit(elem, retry_state);
   1379   }
   1380 }
   1381 
   1382 // Starts a retry after appropriate back-off.
   1383 static void do_retry(grpc_call_element* elem,
   1384                      subchannel_call_retry_state* retry_state,
   1385                      grpc_millis server_pushback_ms) {
   1386   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1387   call_data* calld = static_cast<call_data*>(elem->call_data);
   1388   GPR_ASSERT(calld->method_params != nullptr);
   1389   const ClientChannelMethodParams::RetryPolicy* retry_policy =
   1390       calld->method_params->retry_policy();
   1391   GPR_ASSERT(retry_policy != nullptr);
   1392   // Reset subchannel call and connected subchannel.
   1393   if (calld->subchannel_call != nullptr) {
   1394     GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
   1395                                "client_channel_call_retry");
   1396     calld->subchannel_call = nullptr;
   1397   }
   1398   if (calld->pick.connected_subchannel != nullptr) {
   1399     calld->pick.connected_subchannel.reset();
   1400   }
   1401   // Compute backoff delay.
   1402   grpc_millis next_attempt_time;
   1403   if (server_pushback_ms >= 0) {
   1404     next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
   1405     calld->last_attempt_got_server_pushback = true;
   1406   } else {
   1407     if (calld->num_attempts_completed == 1 ||
   1408         calld->last_attempt_got_server_pushback) {
   1409       calld->retry_backoff.Init(
   1410           grpc_core::BackOff::Options()
   1411               .set_initial_backoff(retry_policy->initial_backoff)
   1412               .set_multiplier(retry_policy->backoff_multiplier)
   1413               .set_jitter(RETRY_BACKOFF_JITTER)
   1414               .set_max_backoff(retry_policy->max_backoff));
   1415       calld->last_attempt_got_server_pushback = false;
   1416     }
   1417     next_attempt_time = calld->retry_backoff->NextAttemptTime();
   1418   }
   1419   if (grpc_client_channel_trace.enabled()) {
   1420     gpr_log(GPR_INFO,
   1421             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
   1422             calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
   1423   }
   1424   // Schedule retry after computed delay.
   1425   GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
   1426                     grpc_combiner_scheduler(chand->combiner));
   1427   grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
   1428   // Update bookkeeping.
   1429   if (retry_state != nullptr) retry_state->retry_dispatched = true;
   1430 }
   1431 
   1432 // Returns true if the call is being retried.
   1433 static bool maybe_retry(grpc_call_element* elem,
   1434                         subchannel_batch_data* batch_data,
   1435                         grpc_status_code status,
   1436                         grpc_mdelem* server_pushback_md) {
   1437   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1438   call_data* calld = static_cast<call_data*>(elem->call_data);
   1439   // Get retry policy.
   1440   if (calld->method_params == nullptr) return false;
   1441   const ClientChannelMethodParams::RetryPolicy* retry_policy =
   1442       calld->method_params->retry_policy();
   1443   if (retry_policy == nullptr) return false;
   1444   // If we've already dispatched a retry from this call, return true.
   1445   // This catches the case where the batch has multiple callbacks
   1446   // (i.e., it includes either recv_message or recv_initial_metadata).
   1447   subchannel_call_retry_state* retry_state = nullptr;
   1448   if (batch_data != nullptr) {
   1449     retry_state = static_cast<subchannel_call_retry_state*>(
   1450         grpc_connected_subchannel_call_get_parent_data(
   1451             batch_data->subchannel_call));
   1452     if (retry_state->retry_dispatched) {
   1453       if (grpc_client_channel_trace.enabled()) {
   1454         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
   1455                 calld);
   1456       }
   1457       return true;
   1458     }
   1459   }
   1460   // Check status.
   1461   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
   1462     if (calld->retry_throttle_data != nullptr) {
   1463       calld->retry_throttle_data->RecordSuccess();
   1464     }
   1465     if (grpc_client_channel_trace.enabled()) {
   1466       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
   1467     }
   1468     return false;
   1469   }
   1470   // Status is not OK.  Check whether the status is retryable.
   1471   if (!retry_policy->retryable_status_codes.Contains(status)) {
   1472     if (grpc_client_channel_trace.enabled()) {
   1473       gpr_log(GPR_INFO,
   1474               "chand=%p calld=%p: status %s not configured as retryable", chand,
   1475               calld, grpc_status_code_to_string(status));
   1476     }
   1477     return false;
   1478   }
   1479   // Record the failure and check whether retries are throttled.
   1480   // Note that it's important for this check to come after the status
   1481   // code check above, since we should only record failures whose statuses
   1482   // match the configured retryable status codes, so that we don't count
   1483   // things like failures due to malformed requests (INVALID_ARGUMENT).
   1484   // Conversely, it's important for this to come before the remaining
   1485   // checks, so that we don't fail to record failures due to other factors.
   1486   if (calld->retry_throttle_data != nullptr &&
   1487       !calld->retry_throttle_data->RecordFailure()) {
   1488     if (grpc_client_channel_trace.enabled()) {
   1489       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
   1490     }
   1491     return false;
   1492   }
   1493   // Check whether the call is committed.
   1494   if (calld->retry_committed) {
   1495     if (grpc_client_channel_trace.enabled()) {
   1496       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
   1497               calld);
   1498     }
   1499     return false;
   1500   }
   1501   // Check whether we have retries remaining.
   1502   ++calld->num_attempts_completed;
   1503   if (calld->num_attempts_completed >= retry_policy->max_attempts) {
   1504     if (grpc_client_channel_trace.enabled()) {
   1505       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
   1506               calld, retry_policy->max_attempts);
   1507     }
   1508     return false;
   1509   }
   1510   // If the call was cancelled from the surface, don't retry.
   1511   if (calld->cancel_error != GRPC_ERROR_NONE) {
   1512     if (grpc_client_channel_trace.enabled()) {
   1513       gpr_log(GPR_INFO,
   1514               "chand=%p calld=%p: call cancelled from surface, not retrying",
   1515               chand, calld);
   1516     }
   1517     return false;
   1518   }
   1519   // Check server push-back.
   1520   grpc_millis server_pushback_ms = -1;
   1521   if (server_pushback_md != nullptr) {
   1522     // If the value is "-1" or any other unparseable string, we do not retry.
   1523     uint32_t ms;
   1524     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
   1525       if (grpc_client_channel_trace.enabled()) {
   1526         gpr_log(GPR_INFO,
   1527                 "chand=%p calld=%p: not retrying due to server push-back",
   1528                 chand, calld);
   1529       }
   1530       return false;
   1531     } else {
   1532       if (grpc_client_channel_trace.enabled()) {
   1533         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
   1534                 chand, calld, ms);
   1535       }
   1536       server_pushback_ms = (grpc_millis)ms;
   1537     }
   1538   }
   1539   do_retry(elem, retry_state, server_pushback_ms);
   1540   return true;
   1541 }
   1542 
   1543 //
   1544 // subchannel_batch_data
   1545 //
   1546 
   1547 // Creates a subchannel_batch_data object on the call's arena with the
   1548 // specified refcount.  If set_on_complete is true, the batch's
   1549 // on_complete callback will be set to point to on_complete();
   1550 // otherwise, the batch's on_complete callback will be null.
   1551 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
   1552                                                 int refcount,
   1553                                                 bool set_on_complete) {
   1554   call_data* calld = static_cast<call_data*>(elem->call_data);
   1555   subchannel_call_retry_state* retry_state =
   1556       static_cast<subchannel_call_retry_state*>(
   1557           grpc_connected_subchannel_call_get_parent_data(
   1558               calld->subchannel_call));
   1559   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
   1560       gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
   1561   batch_data->elem = elem;
   1562   batch_data->subchannel_call =
   1563       GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
   1564   batch_data->batch.payload = &retry_state->batch_payload;
   1565   gpr_ref_init(&batch_data->refs, refcount);
   1566   if (set_on_complete) {
   1567     GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
   1568                       grpc_schedule_on_exec_ctx);
   1569     batch_data->batch.on_complete = &batch_data->on_complete;
   1570   }
   1571   GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
   1572   return batch_data;
   1573 }
   1574 
   1575 static void batch_data_unref(subchannel_batch_data* batch_data) {
   1576   if (gpr_unref(&batch_data->refs)) {
   1577     subchannel_call_retry_state* retry_state =
   1578         static_cast<subchannel_call_retry_state*>(
   1579             grpc_connected_subchannel_call_get_parent_data(
   1580                 batch_data->subchannel_call));
   1581     if (batch_data->batch.send_initial_metadata) {
   1582       grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
   1583     }
   1584     if (batch_data->batch.send_trailing_metadata) {
   1585       grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
   1586     }
   1587     if (batch_data->batch.recv_initial_metadata) {
   1588       grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
   1589     }
   1590     if (batch_data->batch.recv_trailing_metadata) {
   1591       grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
   1592     }
   1593     GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
   1594     call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
   1595     GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
   1596   }
   1597 }
   1598 
   1599 //
   1600 // recv_initial_metadata callback handling
   1601 //
   1602 
   1603 // Invokes recv_initial_metadata_ready for a subchannel batch.
   1604 static void invoke_recv_initial_metadata_callback(void* arg,
   1605                                                   grpc_error* error) {
   1606   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
   1607   // Find pending batch.
   1608   pending_batch* pending = pending_batch_find(
   1609       batch_data->elem, "invoking recv_initial_metadata_ready for",
   1610       [](grpc_transport_stream_op_batch* batch) {
   1611         return batch->recv_initial_metadata &&
   1612                batch->payload->recv_initial_metadata
   1613                        .recv_initial_metadata_ready != nullptr;
   1614       });
   1615   GPR_ASSERT(pending != nullptr);
   1616   // Return metadata.
   1617   subchannel_call_retry_state* retry_state =
   1618       static_cast<subchannel_call_retry_state*>(
   1619           grpc_connected_subchannel_call_get_parent_data(
   1620               batch_data->subchannel_call));
   1621   grpc_metadata_batch_move(
   1622       &retry_state->recv_initial_metadata,
   1623       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
   1624   // Update bookkeeping.
   1625   // Note: Need to do this before invoking the callback, since invoking
   1626   // the callback will result in yielding the call combiner.
   1627   grpc_closure* recv_initial_metadata_ready =
   1628       pending->batch->payload->recv_initial_metadata
   1629           .recv_initial_metadata_ready;
   1630   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
   1631       nullptr;
   1632   maybe_clear_pending_batch(batch_data->elem, pending);
   1633   batch_data_unref(batch_data);
   1634   // Invoke callback.
   1635   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
   1636 }
   1637 
   1638 // Intercepts recv_initial_metadata_ready callback for retries.
   1639 // Commits the call and returns the initial metadata up the stack.
   1640 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
   1641   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
   1642   grpc_call_element* elem = batch_data->elem;
   1643   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1644   call_data* calld = static_cast<call_data*>(elem->call_data);
   1645   if (grpc_client_channel_trace.enabled()) {
   1646     gpr_log(GPR_INFO,
   1647             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
   1648             chand, calld, grpc_error_string(error));
   1649   }
   1650   subchannel_call_retry_state* retry_state =
   1651       static_cast<subchannel_call_retry_state*>(
   1652           grpc_connected_subchannel_call_get_parent_data(
   1653               batch_data->subchannel_call));
   1654   retry_state->completed_recv_initial_metadata = true;
   1655   // If a retry was already dispatched, then we're not going to use the
   1656   // result of this recv_initial_metadata op, so do nothing.
   1657   if (retry_state->retry_dispatched) {
   1658     GRPC_CALL_COMBINER_STOP(
   1659         calld->call_combiner,
   1660         "recv_initial_metadata_ready after retry dispatched");
   1661     return;
   1662   }
   1663   // If we got an error or a Trailers-Only response and have not yet gotten
   1664   // the recv_trailing_metadata_ready callback, then defer propagating this
   1665   // callback back to the surface.  We can evaluate whether to retry when
   1666   // recv_trailing_metadata comes back.
   1667   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
   1668                     error != GRPC_ERROR_NONE) &&
   1669                    !retry_state->completed_recv_trailing_metadata)) {
   1670     if (grpc_client_channel_trace.enabled()) {
   1671       gpr_log(GPR_INFO,
   1672               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
   1673               "(Trailers-Only)",
   1674               chand, calld);
   1675     }
   1676     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
   1677     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
   1678     if (!retry_state->started_recv_trailing_metadata) {
   1679       // recv_trailing_metadata not yet started by application; start it
   1680       // ourselves to get status.
   1681       start_internal_recv_trailing_metadata(elem);
   1682     } else {
   1683       GRPC_CALL_COMBINER_STOP(
   1684           calld->call_combiner,
   1685           "recv_initial_metadata_ready trailers-only or error");
   1686     }
   1687     return;
   1688   }
   1689   // Received valid initial metadata, so commit the call.
   1690   retry_commit(elem, retry_state);
   1691   // Invoke the callback to return the result to the surface.
   1692   // Manually invoking a callback function; it does not take ownership of error.
   1693   invoke_recv_initial_metadata_callback(batch_data, error);
   1694 }
   1695 
   1696 //
   1697 // recv_message callback handling
   1698 //
   1699 
   1700 // Invokes recv_message_ready for a subchannel batch.
   1701 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
   1702   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
   1703   // Find pending op.
   1704   pending_batch* pending = pending_batch_find(
   1705       batch_data->elem, "invoking recv_message_ready for",
   1706       [](grpc_transport_stream_op_batch* batch) {
   1707         return batch->recv_message &&
   1708                batch->payload->recv_message.recv_message_ready != nullptr;
   1709       });
   1710   GPR_ASSERT(pending != nullptr);
   1711   // Return payload.
   1712   subchannel_call_retry_state* retry_state =
   1713       static_cast<subchannel_call_retry_state*>(
   1714           grpc_connected_subchannel_call_get_parent_data(
   1715               batch_data->subchannel_call));
   1716   *pending->batch->payload->recv_message.recv_message =
   1717       std::move(retry_state->recv_message);
   1718   // Update bookkeeping.
   1719   // Note: Need to do this before invoking the callback, since invoking
   1720   // the callback will result in yielding the call combiner.
   1721   grpc_closure* recv_message_ready =
   1722       pending->batch->payload->recv_message.recv_message_ready;
   1723   pending->batch->payload->recv_message.recv_message_ready = nullptr;
   1724   maybe_clear_pending_batch(batch_data->elem, pending);
   1725   batch_data_unref(batch_data);
   1726   // Invoke callback.
   1727   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
   1728 }
   1729 
   1730 // Intercepts recv_message_ready callback for retries.
   1731 // Commits the call and returns the message up the stack.
   1732 static void recv_message_ready(void* arg, grpc_error* error) {
   1733   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
   1734   grpc_call_element* elem = batch_data->elem;
   1735   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1736   call_data* calld = static_cast<call_data*>(elem->call_data);
   1737   if (grpc_client_channel_trace.enabled()) {
   1738     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
   1739             chand, calld, grpc_error_string(error));
   1740   }
   1741   subchannel_call_retry_state* retry_state =
   1742       static_cast<subchannel_call_retry_state*>(
   1743           grpc_connected_subchannel_call_get_parent_data(
   1744               batch_data->subchannel_call));
   1745   ++retry_state->completed_recv_message_count;
   1746   // If a retry was already dispatched, then we're not going to use the
   1747   // result of this recv_message op, so do nothing.
   1748   if (retry_state->retry_dispatched) {
   1749     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
   1750                             "recv_message_ready after retry dispatched");
   1751     return;
   1752   }
   1753   // If we got an error or the payload was nullptr and we have not yet gotten
   1754   // the recv_trailing_metadata_ready callback, then defer propagating this
   1755   // callback back to the surface.  We can evaluate whether to retry when
   1756   // recv_trailing_metadata comes back.
   1757   if (GPR_UNLIKELY(
   1758           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
   1759           !retry_state->completed_recv_trailing_metadata)) {
   1760     if (grpc_client_channel_trace.enabled()) {
   1761       gpr_log(GPR_INFO,
   1762               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
   1763               "message and recv_trailing_metadata pending)",
   1764               chand, calld);
   1765     }
   1766     retry_state->recv_message_ready_deferred_batch = batch_data;
   1767     retry_state->recv_message_error = GRPC_ERROR_REF(error);
   1768     if (!retry_state->started_recv_trailing_metadata) {
   1769       // recv_trailing_metadata not yet started by application; start it
   1770       // ourselves to get status.
   1771       start_internal_recv_trailing_metadata(elem);
   1772     } else {
   1773       GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
   1774     }
   1775     return;
   1776   }
   1777   // Received a valid message, so commit the call.
   1778   retry_commit(elem, retry_state);
   1779   // Invoke the callback to return the result to the surface.
   1780   // Manually invoking a callback function; it does not take ownership of error.
   1781   invoke_recv_message_callback(batch_data, error);
   1782 }
   1783 
   1784 //
   1785 // recv_trailing_metadata handling
   1786 //
   1787 
   1788 // Sets *status and *server_pushback_md based on md_batch and error.
   1789 // Only sets *server_pushback_md if server_pushback_md != nullptr.
   1790 static void get_call_status(grpc_call_element* elem,
   1791                             grpc_metadata_batch* md_batch, grpc_error* error,
   1792                             grpc_status_code* status,
   1793                             grpc_mdelem** server_pushback_md) {
   1794   call_data* calld = static_cast<call_data*>(elem->call_data);
   1795   if (error != GRPC_ERROR_NONE) {
   1796     grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
   1797                           nullptr);
   1798   } else {
   1799     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
   1800     *status =
   1801         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
   1802     if (server_pushback_md != nullptr &&
   1803         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
   1804       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
   1805     }
   1806   }
   1807   GRPC_ERROR_UNREF(error);
   1808 }
   1809 
   1810 // Adds recv_trailing_metadata_ready closure to closures.
   1811 static void add_closure_for_recv_trailing_metadata_ready(
   1812     grpc_call_element* elem, subchannel_batch_data* batch_data,
   1813     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
   1814   // Find pending batch.
   1815   pending_batch* pending = pending_batch_find(
   1816       elem, "invoking recv_trailing_metadata for",
   1817       [](grpc_transport_stream_op_batch* batch) {
   1818         return batch->recv_trailing_metadata &&
   1819                batch->payload->recv_trailing_metadata
   1820                        .recv_trailing_metadata_ready != nullptr;
   1821       });
   1822   // If we generated the recv_trailing_metadata op internally via
   1823   // start_internal_recv_trailing_metadata(), then there will be no
   1824   // pending batch.
   1825   if (pending == nullptr) {
   1826     GRPC_ERROR_UNREF(error);
   1827     return;
   1828   }
   1829   // Return metadata.
   1830   subchannel_call_retry_state* retry_state =
   1831       static_cast<subchannel_call_retry_state*>(
   1832           grpc_connected_subchannel_call_get_parent_data(
   1833               batch_data->subchannel_call));
   1834   grpc_metadata_batch_move(
   1835       &retry_state->recv_trailing_metadata,
   1836       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
   1837   // Add closure.
   1838   closures->Add(pending->batch->payload->recv_trailing_metadata
   1839                     .recv_trailing_metadata_ready,
   1840                 error, "recv_trailing_metadata_ready for pending batch");
   1841   // Update bookkeeping.
   1842   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
   1843       nullptr;
   1844   maybe_clear_pending_batch(elem, pending);
   1845 }
   1846 
   1847 // Adds any necessary closures for deferred recv_initial_metadata and
   1848 // recv_message callbacks to closures.
   1849 static void add_closures_for_deferred_recv_callbacks(
   1850     subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
   1851     grpc_core::CallCombinerClosureList* closures) {
   1852   if (batch_data->batch.recv_trailing_metadata) {
   1853     // Add closure for deferred recv_initial_metadata_ready.
   1854     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
   1855                      nullptr)) {
   1856       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
   1857                         invoke_recv_initial_metadata_callback,
   1858                         retry_state->recv_initial_metadata_ready_deferred_batch,
   1859                         grpc_schedule_on_exec_ctx);
   1860       closures->Add(&retry_state->recv_initial_metadata_ready,
   1861                     retry_state->recv_initial_metadata_error,
   1862                     "resuming recv_initial_metadata_ready");
   1863       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
   1864     }
   1865     // Add closure for deferred recv_message_ready.
   1866     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
   1867                      nullptr)) {
   1868       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
   1869                         invoke_recv_message_callback,
   1870                         retry_state->recv_message_ready_deferred_batch,
   1871                         grpc_schedule_on_exec_ctx);
   1872       closures->Add(&retry_state->recv_message_ready,
   1873                     retry_state->recv_message_error,
   1874                     "resuming recv_message_ready");
   1875       retry_state->recv_message_ready_deferred_batch = nullptr;
   1876     }
   1877   }
   1878 }
   1879 
   1880 // Returns true if any op in the batch was not yet started.
   1881 // Only looks at send ops, since recv ops are always started immediately.
   1882 static bool pending_batch_is_unstarted(
   1883     pending_batch* pending, call_data* calld,
   1884     subchannel_call_retry_state* retry_state) {
   1885   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
   1886     return false;
   1887   }
   1888   if (pending->batch->send_initial_metadata &&
   1889       !retry_state->started_send_initial_metadata) {
   1890     return true;
   1891   }
   1892   if (pending->batch->send_message &&
   1893       retry_state->started_send_message_count < calld->send_messages->size()) {
   1894     return true;
   1895   }
   1896   if (pending->batch->send_trailing_metadata &&
   1897       !retry_state->started_send_trailing_metadata) {
   1898     return true;
   1899   }
   1900   return false;
   1901 }
   1902 
   1903 // For any pending batch containing an op that has not yet been started,
   1904 // adds the pending batch's completion closures to closures.
   1905 static void add_closures_to_fail_unstarted_pending_batches(
   1906     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
   1907     grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
   1908   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1909   call_data* calld = static_cast<call_data*>(elem->call_data);
   1910   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   1911     pending_batch* pending = &calld->pending_batches[i];
   1912     if (pending_batch_is_unstarted(pending, calld, retry_state)) {
   1913       if (grpc_client_channel_trace.enabled()) {
   1914         gpr_log(GPR_INFO,
   1915                 "chand=%p calld=%p: failing unstarted pending batch at index "
   1916                 "%" PRIuPTR,
   1917                 chand, calld, i);
   1918       }
   1919       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
   1920                     "failing on_complete for pending batch");
   1921       pending->batch->on_complete = nullptr;
   1922       maybe_clear_pending_batch(elem, pending);
   1923     }
   1924   }
   1925   GRPC_ERROR_UNREF(error);
   1926 }
   1927 
   1928 // Runs necessary closures upon completion of a call attempt.
   1929 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
   1930                                             grpc_error* error) {
   1931   grpc_call_element* elem = batch_data->elem;
   1932   call_data* calld = static_cast<call_data*>(elem->call_data);
   1933   subchannel_call_retry_state* retry_state =
   1934       static_cast<subchannel_call_retry_state*>(
   1935           grpc_connected_subchannel_call_get_parent_data(
   1936               batch_data->subchannel_call));
   1937   // Construct list of closures to execute.
   1938   grpc_core::CallCombinerClosureList closures;
   1939   // First, add closure for recv_trailing_metadata_ready.
   1940   add_closure_for_recv_trailing_metadata_ready(
   1941       elem, batch_data, GRPC_ERROR_REF(error), &closures);
   1942   // If there are deferred recv_initial_metadata_ready or recv_message_ready
   1943   // callbacks, add them to closures.
   1944   add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
   1945   // Add closures to fail any pending batches that have not yet been started.
   1946   add_closures_to_fail_unstarted_pending_batches(
   1947       elem, retry_state, GRPC_ERROR_REF(error), &closures);
   1948   // Don't need batch_data anymore.
   1949   batch_data_unref(batch_data);
   1950   // Schedule all of the closures identified above.
   1951   // Note: This will release the call combiner.
   1952   closures.RunClosures(calld->call_combiner);
   1953   GRPC_ERROR_UNREF(error);
   1954 }
   1955 
   1956 // Intercepts recv_trailing_metadata_ready callback for retries.
   1957 // Commits the call and returns the trailing metadata up the stack.
   1958 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
   1959   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
   1960   grpc_call_element* elem = batch_data->elem;
   1961   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   1962   call_data* calld = static_cast<call_data*>(elem->call_data);
   1963   if (grpc_client_channel_trace.enabled()) {
   1964     gpr_log(GPR_INFO,
   1965             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
   1966             chand, calld, grpc_error_string(error));
   1967   }
   1968   subchannel_call_retry_state* retry_state =
   1969       static_cast<subchannel_call_retry_state*>(
   1970           grpc_connected_subchannel_call_get_parent_data(
   1971               batch_data->subchannel_call));
   1972   retry_state->completed_recv_trailing_metadata = true;
   1973   // Get the call's status and check for server pushback metadata.
   1974   grpc_status_code status = GRPC_STATUS_OK;
   1975   grpc_mdelem* server_pushback_md = nullptr;
   1976   grpc_metadata_batch* md_batch =
   1977       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
   1978   get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
   1979                   &server_pushback_md);
   1980   grpc_core::channelz::SubchannelNode* channelz_subchannel =
   1981       calld->pick.connected_subchannel->channelz_subchannel();
   1982   if (channelz_subchannel != nullptr) {
   1983     if (status == GRPC_STATUS_OK) {
   1984       channelz_subchannel->RecordCallSucceeded();
   1985     } else {
   1986       channelz_subchannel->RecordCallFailed();
   1987     }
   1988   }
   1989   if (grpc_client_channel_trace.enabled()) {
   1990     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
   1991             calld, grpc_status_code_to_string(status));
   1992   }
   1993   // Check if we should retry.
   1994   if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
   1995     // Unref batch_data for deferred recv_initial_metadata_ready or
   1996     // recv_message_ready callbacks, if any.
   1997     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
   1998       batch_data_unref(batch_data);
   1999       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
   2000     }
   2001     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
   2002       batch_data_unref(batch_data);
   2003       GRPC_ERROR_UNREF(retry_state->recv_message_error);
   2004     }
   2005     batch_data_unref(batch_data);
   2006     return;
   2007   }
   2008   // Not retrying, so commit the call.
   2009   retry_commit(elem, retry_state);
   2010   // Run any necessary closures.
   2011   run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
   2012 }
   2013 
   2014 //
   2015 // on_complete callback handling
   2016 //
   2017 
   2018 // Adds the on_complete closure for the pending batch completed in
   2019 // batch_data to closures.
   2020 static void add_closure_for_completed_pending_batch(
   2021     grpc_call_element* elem, subchannel_batch_data* batch_data,
   2022     subchannel_call_retry_state* retry_state, grpc_error* error,
   2023     grpc_core::CallCombinerClosureList* closures) {
   2024   pending_batch* pending = pending_batch_find(
   2025       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
   2026         // Match the pending batch with the same set of send ops as the
   2027         // subchannel batch we've just completed.
   2028         return batch->on_complete != nullptr &&
   2029                batch_data->batch.send_initial_metadata ==
   2030                    batch->send_initial_metadata &&
   2031                batch_data->batch.send_message == batch->send_message &&
   2032                batch_data->batch.send_trailing_metadata ==
   2033                    batch->send_trailing_metadata;
   2034       });
   2035   // If batch_data is a replay batch, then there will be no pending
   2036   // batch to complete.
   2037   if (pending == nullptr) {
   2038     GRPC_ERROR_UNREF(error);
   2039     return;
   2040   }
   2041   // Add closure.
   2042   closures->Add(pending->batch->on_complete, error,
   2043                 "on_complete for pending batch");
   2044   pending->batch->on_complete = nullptr;
   2045   maybe_clear_pending_batch(elem, pending);
   2046 }
   2047 
   2048 // If there are any cached ops to replay or pending ops to start on the
   2049 // subchannel call, adds a closure to closures to invoke
   2050 // start_retriable_subchannel_batches().
   2051 static void add_closures_for_replay_or_pending_send_ops(
   2052     grpc_call_element* elem, subchannel_batch_data* batch_data,
   2053     subchannel_call_retry_state* retry_state,
   2054     grpc_core::CallCombinerClosureList* closures) {
   2055   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2056   call_data* calld = static_cast<call_data*>(elem->call_data);
   2057   bool have_pending_send_message_ops =
   2058       retry_state->started_send_message_count < calld->send_messages->size();
   2059   bool have_pending_send_trailing_metadata_op =
   2060       calld->seen_send_trailing_metadata &&
   2061       !retry_state->started_send_trailing_metadata;
   2062   if (!have_pending_send_message_ops &&
   2063       !have_pending_send_trailing_metadata_op) {
   2064     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   2065       pending_batch* pending = &calld->pending_batches[i];
   2066       grpc_transport_stream_op_batch* batch = pending->batch;
   2067       if (batch == nullptr || pending->send_ops_cached) continue;
   2068       if (batch->send_message) have_pending_send_message_ops = true;
   2069       if (batch->send_trailing_metadata) {
   2070         have_pending_send_trailing_metadata_op = true;
   2071       }
   2072     }
   2073   }
   2074   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
   2075     if (grpc_client_channel_trace.enabled()) {
   2076       gpr_log(GPR_INFO,
   2077               "chand=%p calld=%p: starting next batch for pending send op(s)",
   2078               chand, calld);
   2079     }
   2080     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
   2081                       start_retriable_subchannel_batches, elem,
   2082                       grpc_schedule_on_exec_ctx);
   2083     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
   2084                   "starting next batch for send_* op(s)");
   2085   }
   2086 }
   2087 
   2088 // Callback used to intercept on_complete from subchannel calls.
   2089 // Called only when retries are enabled.
   2090 static void on_complete(void* arg, grpc_error* error) {
   2091   subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
   2092   grpc_call_element* elem = batch_data->elem;
   2093   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2094   call_data* calld = static_cast<call_data*>(elem->call_data);
   2095   if (grpc_client_channel_trace.enabled()) {
   2096     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
   2097     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
   2098             chand, calld, grpc_error_string(error), batch_str);
   2099     gpr_free(batch_str);
   2100   }
   2101   subchannel_call_retry_state* retry_state =
   2102       static_cast<subchannel_call_retry_state*>(
   2103           grpc_connected_subchannel_call_get_parent_data(
   2104               batch_data->subchannel_call));
   2105   // Update bookkeeping in retry_state.
   2106   if (batch_data->batch.send_initial_metadata) {
   2107     retry_state->completed_send_initial_metadata = true;
   2108   }
   2109   if (batch_data->batch.send_message) {
   2110     ++retry_state->completed_send_message_count;
   2111   }
   2112   if (batch_data->batch.send_trailing_metadata) {
   2113     retry_state->completed_send_trailing_metadata = true;
   2114   }
   2115   // If the call is committed, free cached data for send ops that we've just
   2116   // completed.
   2117   if (calld->retry_committed) {
   2118     free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
   2119   }
   2120   // Construct list of closures to execute.
   2121   grpc_core::CallCombinerClosureList closures;
   2122   // If a retry was already dispatched, that means we saw
   2123   // recv_trailing_metadata before this, so we do nothing here.
   2124   // Otherwise, invoke the callback to return the result to the surface.
   2125   if (!retry_state->retry_dispatched) {
   2126     // Add closure for the completed pending batch, if any.
   2127     add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
   2128                                             GRPC_ERROR_REF(error), &closures);
   2129     // If needed, add a callback to start any replay or pending send ops on
   2130     // the subchannel call.
   2131     if (!retry_state->completed_recv_trailing_metadata) {
   2132       add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
   2133                                                   &closures);
   2134     }
   2135   }
   2136   // Track number of pending subchannel send batches and determine if this
   2137   // was the last one.
   2138   --calld->num_pending_retriable_subchannel_send_batches;
   2139   const bool last_send_batch_complete =
   2140       calld->num_pending_retriable_subchannel_send_batches == 0;
   2141   // Don't need batch_data anymore.
   2142   batch_data_unref(batch_data);
   2143   // Schedule all of the closures identified above.
   2144   // Note: This yeilds the call combiner.
   2145   closures.RunClosures(calld->call_combiner);
   2146   // If this was the last subchannel send batch, unref the call stack.
   2147   if (last_send_batch_complete) {
   2148     GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
   2149   }
   2150 }
   2151 
   2152 //
   2153 // subchannel batch construction
   2154 //
   2155 
   2156 // Helper function used to start a subchannel batch in the call combiner.
   2157 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
   2158   grpc_transport_stream_op_batch* batch =
   2159       static_cast<grpc_transport_stream_op_batch*>(arg);
   2160   grpc_subchannel_call* subchannel_call =
   2161       static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
   2162   // Note: This will release the call combiner.
   2163   grpc_subchannel_call_process_op(subchannel_call, batch);
   2164 }
   2165 
   2166 // Adds a closure to closures that will execute batch in the call combiner.
   2167 static void add_closure_for_subchannel_batch(
   2168     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
   2169     grpc_core::CallCombinerClosureList* closures) {
   2170   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2171   call_data* calld = static_cast<call_data*>(elem->call_data);
   2172   batch->handler_private.extra_arg = calld->subchannel_call;
   2173   GRPC_CLOSURE_INIT(&batch->handler_private.closure,
   2174                     start_batch_in_call_combiner, batch,
   2175                     grpc_schedule_on_exec_ctx);
   2176   if (grpc_client_channel_trace.enabled()) {
   2177     char* batch_str = grpc_transport_stream_op_batch_string(batch);
   2178     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
   2179             calld, batch_str);
   2180     gpr_free(batch_str);
   2181   }
   2182   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
   2183                 "start_subchannel_batch");
   2184 }
   2185 
   2186 // Adds retriable send_initial_metadata op to batch_data.
   2187 static void add_retriable_send_initial_metadata_op(
   2188     call_data* calld, subchannel_call_retry_state* retry_state,
   2189     subchannel_batch_data* batch_data) {
   2190   // Maps the number of retries to the corresponding metadata value slice.
   2191   static const grpc_slice* retry_count_strings[] = {
   2192       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
   2193   // We need to make a copy of the metadata batch for each attempt, since
   2194   // the filters in the subchannel stack may modify this batch, and we don't
   2195   // want those modifications to be passed forward to subsequent attempts.
   2196   //
   2197   // If we've already completed one or more attempts, add the
   2198   // grpc-retry-attempts header.
   2199   retry_state->send_initial_metadata_storage =
   2200       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
   2201           calld->arena, sizeof(grpc_linked_mdelem) *
   2202                             (calld->send_initial_metadata.list.count +
   2203                              (calld->num_attempts_completed > 0))));
   2204   grpc_metadata_batch_copy(&calld->send_initial_metadata,
   2205                            &retry_state->send_initial_metadata,
   2206                            retry_state->send_initial_metadata_storage);
   2207   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
   2208                        .grpc_previous_rpc_attempts != nullptr)) {
   2209     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
   2210                                retry_state->send_initial_metadata.idx.named
   2211                                    .grpc_previous_rpc_attempts);
   2212   }
   2213   if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
   2214     grpc_mdelem retry_md = grpc_mdelem_from_slices(
   2215         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
   2216         *retry_count_strings[calld->num_attempts_completed - 1]);
   2217     grpc_error* error = grpc_metadata_batch_add_tail(
   2218         &retry_state->send_initial_metadata,
   2219         &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
   2220                                                         .list.count],
   2221         retry_md);
   2222     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
   2223       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
   2224               grpc_error_string(error));
   2225       GPR_ASSERT(false);
   2226     }
   2227   }
   2228   retry_state->started_send_initial_metadata = true;
   2229   batch_data->batch.send_initial_metadata = true;
   2230   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
   2231       &retry_state->send_initial_metadata;
   2232   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
   2233       calld->send_initial_metadata_flags;
   2234   batch_data->batch.payload->send_initial_metadata.peer_string =
   2235       calld->peer_string;
   2236 }
   2237 
   2238 // Adds retriable send_message op to batch_data.
   2239 static void add_retriable_send_message_op(
   2240     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
   2241     subchannel_batch_data* batch_data) {
   2242   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2243   call_data* calld = static_cast<call_data*>(elem->call_data);
   2244   if (grpc_client_channel_trace.enabled()) {
   2245     gpr_log(GPR_INFO,
   2246             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
   2247             chand, calld, retry_state->started_send_message_count);
   2248   }
   2249   grpc_core::ByteStreamCache* cache =
   2250       (*calld->send_messages)[retry_state->started_send_message_count];
   2251   ++retry_state->started_send_message_count;
   2252   retry_state->send_message.Init(cache);
   2253   batch_data->batch.send_message = true;
   2254   batch_data->batch.payload->send_message.send_message.reset(
   2255       retry_state->send_message.get());
   2256 }
   2257 
   2258 // Adds retriable send_trailing_metadata op to batch_data.
   2259 static void add_retriable_send_trailing_metadata_op(
   2260     call_data* calld, subchannel_call_retry_state* retry_state,
   2261     subchannel_batch_data* batch_data) {
   2262   // We need to make a copy of the metadata batch for each attempt, since
   2263   // the filters in the subchannel stack may modify this batch, and we don't
   2264   // want those modifications to be passed forward to subsequent attempts.
   2265   retry_state->send_trailing_metadata_storage =
   2266       static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
   2267           calld->arena, sizeof(grpc_linked_mdelem) *
   2268                             calld->send_trailing_metadata.list.count));
   2269   grpc_metadata_batch_copy(&calld->send_trailing_metadata,
   2270                            &retry_state->send_trailing_metadata,
   2271                            retry_state->send_trailing_metadata_storage);
   2272   retry_state->started_send_trailing_metadata = true;
   2273   batch_data->batch.send_trailing_metadata = true;
   2274   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
   2275       &retry_state->send_trailing_metadata;
   2276 }
   2277 
   2278 // Adds retriable recv_initial_metadata op to batch_data.
   2279 static void add_retriable_recv_initial_metadata_op(
   2280     call_data* calld, subchannel_call_retry_state* retry_state,
   2281     subchannel_batch_data* batch_data) {
   2282   retry_state->started_recv_initial_metadata = true;
   2283   batch_data->batch.recv_initial_metadata = true;
   2284   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
   2285   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
   2286       &retry_state->recv_initial_metadata;
   2287   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
   2288       &retry_state->trailing_metadata_available;
   2289   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
   2290                     recv_initial_metadata_ready, batch_data,
   2291                     grpc_schedule_on_exec_ctx);
   2292   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
   2293       &retry_state->recv_initial_metadata_ready;
   2294 }
   2295 
   2296 // Adds retriable recv_message op to batch_data.
   2297 static void add_retriable_recv_message_op(
   2298     call_data* calld, subchannel_call_retry_state* retry_state,
   2299     subchannel_batch_data* batch_data) {
   2300   ++retry_state->started_recv_message_count;
   2301   batch_data->batch.recv_message = true;
   2302   batch_data->batch.payload->recv_message.recv_message =
   2303       &retry_state->recv_message;
   2304   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
   2305                     batch_data, grpc_schedule_on_exec_ctx);
   2306   batch_data->batch.payload->recv_message.recv_message_ready =
   2307       &retry_state->recv_message_ready;
   2308 }
   2309 
   2310 // Adds retriable recv_trailing_metadata op to batch_data.
   2311 static void add_retriable_recv_trailing_metadata_op(
   2312     call_data* calld, subchannel_call_retry_state* retry_state,
   2313     subchannel_batch_data* batch_data) {
   2314   retry_state->started_recv_trailing_metadata = true;
   2315   batch_data->batch.recv_trailing_metadata = true;
   2316   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
   2317   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
   2318       &retry_state->recv_trailing_metadata;
   2319   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
   2320       &retry_state->collect_stats;
   2321   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
   2322                     recv_trailing_metadata_ready, batch_data,
   2323                     grpc_schedule_on_exec_ctx);
   2324   batch_data->batch.payload->recv_trailing_metadata
   2325       .recv_trailing_metadata_ready =
   2326       &retry_state->recv_trailing_metadata_ready;
   2327 }
   2328 
   2329 // Helper function used to start a recv_trailing_metadata batch.  This
   2330 // is used in the case where a recv_initial_metadata or recv_message
   2331 // op fails in a way that we know the call is over but when the application
   2332 // has not yet started its own recv_trailing_metadata op.
   2333 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
   2334   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2335   call_data* calld = static_cast<call_data*>(elem->call_data);
   2336   if (grpc_client_channel_trace.enabled()) {
   2337     gpr_log(GPR_INFO,
   2338             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
   2339             "started; starting it internally",
   2340             chand, calld);
   2341   }
   2342   subchannel_call_retry_state* retry_state =
   2343       static_cast<subchannel_call_retry_state*>(
   2344           grpc_connected_subchannel_call_get_parent_data(
   2345               calld->subchannel_call));
   2346   // Create batch_data with 2 refs, since this batch will be unreffed twice:
   2347   // once for the recv_trailing_metadata_ready callback when the subchannel
   2348   // batch returns, and again when we actually get a recv_trailing_metadata
   2349   // op from the surface.
   2350   subchannel_batch_data* batch_data =
   2351       batch_data_create(elem, 2, false /* set_on_complete */);
   2352   add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
   2353   retry_state->recv_trailing_metadata_internal_batch = batch_data;
   2354   // Note: This will release the call combiner.
   2355   grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
   2356 }
   2357 
   2358 // If there are any cached send ops that need to be replayed on the
   2359 // current subchannel call, creates and returns a new subchannel batch
   2360 // to replay those ops.  Otherwise, returns nullptr.
   2361 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
   2362     grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
   2363   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2364   call_data* calld = static_cast<call_data*>(elem->call_data);
   2365   subchannel_batch_data* replay_batch_data = nullptr;
   2366   // send_initial_metadata.
   2367   if (calld->seen_send_initial_metadata &&
   2368       !retry_state->started_send_initial_metadata &&
   2369       !calld->pending_send_initial_metadata) {
   2370     if (grpc_client_channel_trace.enabled()) {
   2371       gpr_log(GPR_INFO,
   2372               "chand=%p calld=%p: replaying previously completed "
   2373               "send_initial_metadata op",
   2374               chand, calld);
   2375     }
   2376     replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
   2377     add_retriable_send_initial_metadata_op(calld, retry_state,
   2378                                            replay_batch_data);
   2379   }
   2380   // send_message.
   2381   // Note that we can only have one send_message op in flight at a time.
   2382   if (retry_state->started_send_message_count < calld->send_messages->size() &&
   2383       retry_state->started_send_message_count ==
   2384           retry_state->completed_send_message_count &&
   2385       !calld->pending_send_message) {
   2386     if (grpc_client_channel_trace.enabled()) {
   2387       gpr_log(GPR_INFO,
   2388               "chand=%p calld=%p: replaying previously completed "
   2389               "send_message op",
   2390               chand, calld);
   2391     }
   2392     if (replay_batch_data == nullptr) {
   2393       replay_batch_data =
   2394           batch_data_create(elem, 1, true /* set_on_complete */);
   2395     }
   2396     add_retriable_send_message_op(elem, retry_state, replay_batch_data);
   2397   }
   2398   // send_trailing_metadata.
   2399   // Note that we only add this op if we have no more send_message ops
   2400   // to start, since we can't send down any more send_message ops after
   2401   // send_trailing_metadata.
   2402   if (calld->seen_send_trailing_metadata &&
   2403       retry_state->started_send_message_count == calld->send_messages->size() &&
   2404       !retry_state->started_send_trailing_metadata &&
   2405       !calld->pending_send_trailing_metadata) {
   2406     if (grpc_client_channel_trace.enabled()) {
   2407       gpr_log(GPR_INFO,
   2408               "chand=%p calld=%p: replaying previously completed "
   2409               "send_trailing_metadata op",
   2410               chand, calld);
   2411     }
   2412     if (replay_batch_data == nullptr) {
   2413       replay_batch_data =
   2414           batch_data_create(elem, 1, true /* set_on_complete */);
   2415     }
   2416     add_retriable_send_trailing_metadata_op(calld, retry_state,
   2417                                             replay_batch_data);
   2418   }
   2419   return replay_batch_data;
   2420 }
   2421 
   2422 // Adds subchannel batches for pending batches to batches, updating
   2423 // *num_batches as needed.
   2424 static void add_subchannel_batches_for_pending_batches(
   2425     grpc_call_element* elem, subchannel_call_retry_state* retry_state,
   2426     grpc_core::CallCombinerClosureList* closures) {
   2427   call_data* calld = static_cast<call_data*>(elem->call_data);
   2428   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   2429     pending_batch* pending = &calld->pending_batches[i];
   2430     grpc_transport_stream_op_batch* batch = pending->batch;
   2431     if (batch == nullptr) continue;
   2432     // Skip any batch that either (a) has already been started on this
   2433     // subchannel call or (b) we can't start yet because we're still
   2434     // replaying send ops that need to be completed first.
   2435     // TODO(roth): Note that if any one op in the batch can't be sent
   2436     // yet due to ops that we're replaying, we don't start any of the ops
   2437     // in the batch.  This is probably okay, but it could conceivably
   2438     // lead to increased latency in some cases -- e.g., we could delay
   2439     // starting a recv op due to it being in the same batch with a send
   2440     // op.  If/when we revamp the callback protocol in
   2441     // transport_stream_op_batch, we may be able to fix this.
   2442     if (batch->send_initial_metadata &&
   2443         retry_state->started_send_initial_metadata) {
   2444       continue;
   2445     }
   2446     if (batch->send_message && retry_state->completed_send_message_count <
   2447                                    retry_state->started_send_message_count) {
   2448       continue;
   2449     }
   2450     // Note that we only start send_trailing_metadata if we have no more
   2451     // send_message ops to start, since we can't send down any more
   2452     // send_message ops after send_trailing_metadata.
   2453     if (batch->send_trailing_metadata &&
   2454         (retry_state->started_send_message_count + batch->send_message <
   2455              calld->send_messages->size() ||
   2456          retry_state->started_send_trailing_metadata)) {
   2457       continue;
   2458     }
   2459     if (batch->recv_initial_metadata &&
   2460         retry_state->started_recv_initial_metadata) {
   2461       continue;
   2462     }
   2463     if (batch->recv_message && retry_state->completed_recv_message_count <
   2464                                    retry_state->started_recv_message_count) {
   2465       continue;
   2466     }
   2467     if (batch->recv_trailing_metadata &&
   2468         retry_state->started_recv_trailing_metadata) {
   2469       // If we previously completed a recv_trailing_metadata op
   2470       // initiated by start_internal_recv_trailing_metadata(), use the
   2471       // result of that instead of trying to re-start this op.
   2472       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
   2473                         nullptr))) {
   2474         // If the batch completed, then trigger the completion callback
   2475         // directly, so that we return the previously returned results to
   2476         // the application.  Otherwise, just unref the internally
   2477         // started subchannel batch, since we'll propagate the
   2478         // completion when it completes.
   2479         if (retry_state->completed_recv_trailing_metadata) {
   2480           // Batches containing recv_trailing_metadata always succeed.
   2481           closures->Add(
   2482               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
   2483               "re-executing recv_trailing_metadata_ready to propagate "
   2484               "internally triggered result");
   2485         } else {
   2486           batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
   2487         }
   2488         retry_state->recv_trailing_metadata_internal_batch = nullptr;
   2489       }
   2490       continue;
   2491     }
   2492     // If we're not retrying, just send the batch as-is.
   2493     if (calld->method_params == nullptr ||
   2494         calld->method_params->retry_policy() == nullptr ||
   2495         calld->retry_committed) {
   2496       add_closure_for_subchannel_batch(elem, batch, closures);
   2497       pending_batch_clear(calld, pending);
   2498       continue;
   2499     }
   2500     // Create batch with the right number of callbacks.
   2501     const bool has_send_ops = batch->send_initial_metadata ||
   2502                               batch->send_message ||
   2503                               batch->send_trailing_metadata;
   2504     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
   2505                               batch->recv_message +
   2506                               batch->recv_trailing_metadata;
   2507     subchannel_batch_data* batch_data = batch_data_create(
   2508         elem, num_callbacks, has_send_ops /* set_on_complete */);
   2509     // Cache send ops if needed.
   2510     maybe_cache_send_ops_for_batch(calld, pending);
   2511     // send_initial_metadata.
   2512     if (batch->send_initial_metadata) {
   2513       add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
   2514     }
   2515     // send_message.
   2516     if (batch->send_message) {
   2517       add_retriable_send_message_op(elem, retry_state, batch_data);
   2518     }
   2519     // send_trailing_metadata.
   2520     if (batch->send_trailing_metadata) {
   2521       add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
   2522     }
   2523     // recv_initial_metadata.
   2524     if (batch->recv_initial_metadata) {
   2525       // recv_flags is only used on the server side.
   2526       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
   2527       add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
   2528     }
   2529     // recv_message.
   2530     if (batch->recv_message) {
   2531       add_retriable_recv_message_op(calld, retry_state, batch_data);
   2532     }
   2533     // recv_trailing_metadata.
   2534     if (batch->recv_trailing_metadata) {
   2535       add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
   2536     }
   2537     add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
   2538     // Track number of pending subchannel send batches.
   2539     // If this is the first one, take a ref to the call stack.
   2540     if (batch->send_initial_metadata || batch->send_message ||
   2541         batch->send_trailing_metadata) {
   2542       if (calld->num_pending_retriable_subchannel_send_batches == 0) {
   2543         GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
   2544       }
   2545       ++calld->num_pending_retriable_subchannel_send_batches;
   2546     }
   2547   }
   2548 }
   2549 
   2550 // Constructs and starts whatever subchannel batches are needed on the
   2551 // subchannel call.
   2552 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
   2553   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
   2554   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2555   call_data* calld = static_cast<call_data*>(elem->call_data);
   2556   if (grpc_client_channel_trace.enabled()) {
   2557     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
   2558             chand, calld);
   2559   }
   2560   subchannel_call_retry_state* retry_state =
   2561       static_cast<subchannel_call_retry_state*>(
   2562           grpc_connected_subchannel_call_get_parent_data(
   2563               calld->subchannel_call));
   2564   // Construct list of closures to execute, one for each pending batch.
   2565   grpc_core::CallCombinerClosureList closures;
   2566   // Replay previously-returned send_* ops if needed.
   2567   subchannel_batch_data* replay_batch_data =
   2568       maybe_create_subchannel_batch_for_replay(elem, retry_state);
   2569   if (replay_batch_data != nullptr) {
   2570     add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
   2571                                      &closures);
   2572     // Track number of pending subchannel send batches.
   2573     // If this is the first one, take a ref to the call stack.
   2574     if (calld->num_pending_retriable_subchannel_send_batches == 0) {
   2575       GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
   2576     }
   2577     ++calld->num_pending_retriable_subchannel_send_batches;
   2578   }
   2579   // Now add pending batches.
   2580   add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
   2581   // Start batches on subchannel call.
   2582   if (grpc_client_channel_trace.enabled()) {
   2583     gpr_log(GPR_INFO,
   2584             "chand=%p calld=%p: starting %" PRIuPTR
   2585             " retriable batches on subchannel_call=%p",
   2586             chand, calld, closures.size(), calld->subchannel_call);
   2587   }
   2588   // Note: This will yield the call combiner.
   2589   closures.RunClosures(calld->call_combiner);
   2590 }
   2591 
   2592 //
   2593 // Channelz
   2594 //
   2595 
   2596 static void recv_trailing_metadata_ready_channelz(void* arg,
   2597                                                   grpc_error* error) {
   2598   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
   2599   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2600   call_data* calld = static_cast<call_data*>(elem->call_data);
   2601   if (grpc_client_channel_trace.enabled()) {
   2602     gpr_log(GPR_INFO,
   2603             "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
   2604             "error=%s",
   2605             chand, calld, grpc_error_string(error));
   2606   }
   2607   GPR_ASSERT(calld->recv_trailing_metadata != nullptr);
   2608   grpc_status_code status = GRPC_STATUS_OK;
   2609   grpc_metadata_batch* md_batch = calld->recv_trailing_metadata;
   2610   get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
   2611   grpc_core::channelz::SubchannelNode* channelz_subchannel =
   2612       calld->pick.connected_subchannel->channelz_subchannel();
   2613   GPR_ASSERT(channelz_subchannel != nullptr);
   2614   if (status == GRPC_STATUS_OK) {
   2615     channelz_subchannel->RecordCallSucceeded();
   2616   } else {
   2617     channelz_subchannel->RecordCallFailed();
   2618   }
   2619   calld->recv_trailing_metadata = nullptr;
   2620   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
   2621 }
   2622 
   2623 // If channelz is enabled, intercept recv_trailing so that we may check the
   2624 // status and associate it to a subchannel.
   2625 // Returns true if callback was intercepted, false otherwise.
   2626 static void maybe_intercept_recv_trailing_metadata_for_channelz(
   2627     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
   2628   call_data* calld = static_cast<call_data*>(elem->call_data);
   2629   // only intercept payloads with recv trailing.
   2630   if (!batch->recv_trailing_metadata) {
   2631     return;
   2632   }
   2633   // only add interceptor is channelz is enabled.
   2634   if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
   2635     return;
   2636   }
   2637   if (grpc_client_channel_trace.enabled()) {
   2638     gpr_log(GPR_INFO,
   2639             "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
   2640             batch);
   2641   }
   2642   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
   2643                     recv_trailing_metadata_ready_channelz, elem,
   2644                     grpc_schedule_on_exec_ctx);
   2645   // save some state needed for the interception callback.
   2646   GPR_ASSERT(calld->recv_trailing_metadata == nullptr);
   2647   calld->recv_trailing_metadata =
   2648       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
   2649   calld->original_recv_trailing_metadata =
   2650       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
   2651   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
   2652       &calld->recv_trailing_metadata_ready_channelz;
   2653 }
   2654 
   2655 //
   2656 // LB pick
   2657 //
   2658 
   2659 static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
   2660   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2661   call_data* calld = static_cast<call_data*>(elem->call_data);
   2662   const size_t parent_data_size =
   2663       calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
   2664   const grpc_core::ConnectedSubchannel::CallArgs call_args = {
   2665       calld->pollent,                       // pollent
   2666       calld->path,                          // path
   2667       calld->call_start_time,               // start_time
   2668       calld->deadline,                      // deadline
   2669       calld->arena,                         // arena
   2670       calld->pick.subchannel_call_context,  // context
   2671       calld->call_combiner,                 // call_combiner
   2672       parent_data_size                      // parent_data_size
   2673   };
   2674   grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
   2675       call_args, &calld->subchannel_call);
   2676   if (grpc_client_channel_trace.enabled()) {
   2677     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
   2678             chand, calld, calld->subchannel_call, grpc_error_string(new_error));
   2679   }
   2680   if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
   2681     new_error = grpc_error_add_child(new_error, error);
   2682     pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
   2683   } else {
   2684     grpc_core::channelz::SubchannelNode* channelz_subchannel =
   2685         calld->pick.connected_subchannel->channelz_subchannel();
   2686     if (channelz_subchannel != nullptr) {
   2687       channelz_subchannel->RecordCallStarted();
   2688     }
   2689     if (parent_data_size > 0) {
   2690       subchannel_call_retry_state* retry_state =
   2691           static_cast<subchannel_call_retry_state*>(
   2692               grpc_connected_subchannel_call_get_parent_data(
   2693                   calld->subchannel_call));
   2694       retry_state->batch_payload.context = calld->pick.subchannel_call_context;
   2695     }
   2696     pending_batches_resume(elem);
   2697   }
   2698   GRPC_ERROR_UNREF(error);
   2699 }
   2700 
   2701 // Invoked when a pick is completed, on both success or failure.
   2702 static void pick_done(void* arg, grpc_error* error) {
   2703   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
   2704   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2705   call_data* calld = static_cast<call_data*>(elem->call_data);
   2706   if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
   2707     // Failed to create subchannel.
   2708     // If there was no error, this is an LB policy drop, in which case
   2709     // we return an error; otherwise, we may retry.
   2710     grpc_status_code status = GRPC_STATUS_OK;
   2711     grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
   2712                           nullptr);
   2713     if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
   2714         !maybe_retry(elem, nullptr /* batch_data */, status,
   2715                      nullptr /* server_pushback_md */)) {
   2716       grpc_error* new_error =
   2717           error == GRPC_ERROR_NONE
   2718               ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
   2719                     "Call dropped by load balancing policy")
   2720               : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
   2721                     "Failed to create subchannel", &error, 1);
   2722       if (grpc_client_channel_trace.enabled()) {
   2723         gpr_log(GPR_INFO,
   2724                 "chand=%p calld=%p: failed to create subchannel: error=%s",
   2725                 chand, calld, grpc_error_string(new_error));
   2726       }
   2727       pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
   2728     }
   2729   } else {
   2730     /* Create call on subchannel. */
   2731     create_subchannel_call(elem, GRPC_ERROR_REF(error));
   2732   }
   2733 }
   2734 
   2735 static void maybe_add_call_to_channel_interested_parties_locked(
   2736     grpc_call_element* elem) {
   2737   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2738   call_data* calld = static_cast<call_data*>(elem->call_data);
   2739   if (!calld->pollent_added_to_interested_parties) {
   2740     calld->pollent_added_to_interested_parties = true;
   2741     grpc_polling_entity_add_to_pollset_set(calld->pollent,
   2742                                            chand->interested_parties);
   2743   }
   2744 }
   2745 
   2746 static void maybe_del_call_from_channel_interested_parties_locked(
   2747     grpc_call_element* elem) {
   2748   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2749   call_data* calld = static_cast<call_data*>(elem->call_data);
   2750   if (calld->pollent_added_to_interested_parties) {
   2751     calld->pollent_added_to_interested_parties = false;
   2752     grpc_polling_entity_del_from_pollset_set(calld->pollent,
   2753                                              chand->interested_parties);
   2754   }
   2755 }
   2756 
   2757 // Invoked when a pick is completed to leave the client_channel combiner
   2758 // and continue processing in the call combiner.
   2759 // If needed, removes the call's polling entity from chand->interested_parties.
   2760 static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
   2761   call_data* calld = static_cast<call_data*>(elem->call_data);
   2762   maybe_del_call_from_channel_interested_parties_locked(elem);
   2763   GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
   2764                     grpc_schedule_on_exec_ctx);
   2765   GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
   2766 }
   2767 
   2768 namespace grpc_core {
   2769 
   2770 // Performs subchannel pick via LB policy.
   2771 class LbPicker {
   2772  public:
   2773   // Starts a pick on chand->lb_policy.
   2774   static void StartLocked(grpc_call_element* elem) {
   2775     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2776     call_data* calld = static_cast<call_data*>(elem->call_data);
   2777     if (grpc_client_channel_trace.enabled()) {
   2778       gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
   2779               chand, calld, chand->lb_policy.get());
   2780     }
   2781     // If this is a retry, use the send_initial_metadata payload that
   2782     // we've cached; otherwise, use the pending batch.  The
   2783     // send_initial_metadata batch will be the first pending batch in the
   2784     // list, as set by get_batch_index() above.
   2785     calld->pick.initial_metadata =
   2786         calld->seen_send_initial_metadata
   2787             ? &calld->send_initial_metadata
   2788             : calld->pending_batches[0]
   2789                   .batch->payload->send_initial_metadata.send_initial_metadata;
   2790     calld->pick.initial_metadata_flags =
   2791         calld->seen_send_initial_metadata
   2792             ? calld->send_initial_metadata_flags
   2793             : calld->pending_batches[0]
   2794                   .batch->payload->send_initial_metadata
   2795                   .send_initial_metadata_flags;
   2796     GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
   2797                       grpc_combiner_scheduler(chand->combiner));
   2798     calld->pick.on_complete = &calld->pick_closure;
   2799     GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
   2800     grpc_error* error = GRPC_ERROR_NONE;
   2801     const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
   2802     if (GPR_LIKELY(pick_done)) {
   2803       // Pick completed synchronously.
   2804       if (grpc_client_channel_trace.enabled()) {
   2805         gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
   2806                 chand, calld);
   2807       }
   2808       pick_done_locked(elem, error);
   2809       GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
   2810     } else {
   2811       // Pick will be returned asynchronously.
   2812       // Add the polling entity from call_data to the channel_data's
   2813       // interested_parties, so that the I/O of the LB policy can be done
   2814       // under it.  It will be removed in pick_done_locked().
   2815       maybe_add_call_to_channel_interested_parties_locked(elem);
   2816       // Request notification on call cancellation.
   2817       GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
   2818       grpc_call_combiner_set_notify_on_cancel(
   2819           calld->call_combiner,
   2820           GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
   2821                             &LbPicker::CancelLocked, elem,
   2822                             grpc_combiner_scheduler(chand->combiner)));
   2823     }
   2824   }
   2825 
   2826  private:
   2827   // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
   2828   // Unrefs the LB policy and invokes pick_done_locked().
   2829   static void DoneLocked(void* arg, grpc_error* error) {
   2830     grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
   2831     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2832     call_data* calld = static_cast<call_data*>(elem->call_data);
   2833     if (grpc_client_channel_trace.enabled()) {
   2834       gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
   2835               chand, calld);
   2836     }
   2837     pick_done_locked(elem, GRPC_ERROR_REF(error));
   2838     GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
   2839   }
   2840 
   2841   // Note: This runs under the client_channel combiner, but will NOT be
   2842   // holding the call combiner.
   2843   static void CancelLocked(void* arg, grpc_error* error) {
   2844     grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
   2845     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2846     call_data* calld = static_cast<call_data*>(elem->call_data);
   2847     // Note: chand->lb_policy may have changed since we started our pick,
   2848     // in which case we will be cancelling the pick on a policy other than
   2849     // the one we started it on.  However, this will just be a no-op.
   2850     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
   2851       if (grpc_client_channel_trace.enabled()) {
   2852         gpr_log(GPR_INFO,
   2853                 "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
   2854                 calld, chand->lb_policy.get());
   2855       }
   2856       chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
   2857     }
   2858     GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
   2859   }
   2860 };
   2861 
   2862 }  // namespace grpc_core
   2863 
   2864 // Applies service config to the call.  Must be invoked once we know
   2865 // that the resolver has returned results to the channel.
   2866 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
   2867   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2868   call_data* calld = static_cast<call_data*>(elem->call_data);
   2869   if (grpc_client_channel_trace.enabled()) {
   2870     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
   2871             chand, calld);
   2872   }
   2873   if (chand->retry_throttle_data != nullptr) {
   2874     calld->retry_throttle_data = chand->retry_throttle_data->Ref();
   2875   }
   2876   if (chand->method_params_table != nullptr) {
   2877     calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
   2878         *chand->method_params_table, calld->path);
   2879     if (calld->method_params != nullptr) {
   2880       // If the deadline from the service config is shorter than the one
   2881       // from the client API, reset the deadline timer.
   2882       if (chand->deadline_checking_enabled &&
   2883           calld->method_params->timeout() != 0) {
   2884         const grpc_millis per_method_deadline =
   2885             grpc_timespec_to_millis_round_up(calld->call_start_time) +
   2886             calld->method_params->timeout();
   2887         if (per_method_deadline < calld->deadline) {
   2888           calld->deadline = per_method_deadline;
   2889           grpc_deadline_state_reset(elem, calld->deadline);
   2890         }
   2891       }
   2892       // If the service config set wait_for_ready and the application
   2893       // did not explicitly set it, use the value from the service config.
   2894       uint32_t* send_initial_metadata_flags =
   2895           &calld->pending_batches[0]
   2896                .batch->payload->send_initial_metadata
   2897                .send_initial_metadata_flags;
   2898       if (GPR_UNLIKELY(
   2899               calld->method_params->wait_for_ready() !=
   2900                   ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
   2901               !(*send_initial_metadata_flags &
   2902                 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
   2903         if (calld->method_params->wait_for_ready() ==
   2904             ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
   2905           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
   2906         } else {
   2907           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
   2908         }
   2909       }
   2910     }
   2911   }
   2912   // If no retry policy, disable retries.
   2913   // TODO(roth): Remove this when adding support for transparent retries.
   2914   if (calld->method_params == nullptr ||
   2915       calld->method_params->retry_policy() == nullptr) {
   2916     calld->enable_retries = false;
   2917   }
   2918 }
   2919 
   2920 // Invoked once resolver results are available.
   2921 static void process_service_config_and_start_lb_pick_locked(
   2922     grpc_call_element* elem) {
   2923   call_data* calld = static_cast<call_data*>(elem->call_data);
   2924   // Only get service config data on the first attempt.
   2925   if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
   2926     apply_service_config_to_call_locked(elem);
   2927   }
   2928   // Start LB pick.
   2929   grpc_core::LbPicker::StartLocked(elem);
   2930 }
   2931 
   2932 namespace grpc_core {
   2933 
   2934 // Handles waiting for a resolver result.
   2935 // Used only for the first call on an idle channel.
   2936 class ResolverResultWaiter {
   2937  public:
   2938   explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
   2939     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2940     call_data* calld = static_cast<call_data*>(elem->call_data);
   2941     if (grpc_client_channel_trace.enabled()) {
   2942       gpr_log(GPR_INFO,
   2943               "chand=%p calld=%p: deferring pick pending resolver result",
   2944               chand, calld);
   2945     }
   2946     // Add closure to be run when a resolver result is available.
   2947     GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
   2948                       grpc_combiner_scheduler(chand->combiner));
   2949     AddToWaitingList();
   2950     // Set cancellation closure, so that we abort if the call is cancelled.
   2951     GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
   2952                       this, grpc_combiner_scheduler(chand->combiner));
   2953     grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
   2954                                             &cancel_closure_);
   2955   }
   2956 
   2957  private:
   2958   // Adds closure_ to chand->waiting_for_resolver_result_closures.
   2959   void AddToWaitingList() {
   2960     channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
   2961     grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
   2962                              &done_closure_, GRPC_ERROR_NONE);
   2963   }
   2964 
   2965   // Invoked when a resolver result is available.
   2966   static void DoneLocked(void* arg, grpc_error* error) {
   2967     ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
   2968     // If CancelLocked() has already run, delete ourselves without doing
   2969     // anything.  Note that the call stack may have already been destroyed,
   2970     // so it's not safe to access anything in elem_.
   2971     if (GPR_UNLIKELY(self->finished_)) {
   2972       if (grpc_client_channel_trace.enabled()) {
   2973         gpr_log(GPR_INFO, "call cancelled before resolver result");
   2974       }
   2975       Delete(self);
   2976       return;
   2977     }
   2978     // Otherwise, process the resolver result.
   2979     grpc_call_element* elem = self->elem_;
   2980     channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   2981     call_data* calld = static_cast<call_data*>(elem->call_data);
   2982     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
   2983       if (grpc_client_channel_trace.enabled()) {
   2984         gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
   2985                 chand, calld);
   2986       }
   2987       pick_done_locked(elem, GRPC_ERROR_REF(error));
   2988     } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
   2989       // Shutting down.
   2990       if (grpc_client_channel_trace.enabled()) {
   2991         gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
   2992                 calld);
   2993       }
   2994       pick_done_locked(elem,
   2995                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
   2996     } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
   2997       // Transient resolver failure.
   2998       // If call has wait_for_ready=true, try again; otherwise, fail.
   2999       uint32_t send_initial_metadata_flags =
   3000           calld->seen_send_initial_metadata
   3001               ? calld->send_initial_metadata_flags
   3002               : calld->pending_batches[0]
   3003                     .batch->payload->send_initial_metadata
   3004                     .send_initial_metadata_flags;
   3005       if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
   3006         if (grpc_client_channel_trace.enabled()) {
   3007           gpr_log(GPR_INFO,
   3008                   "chand=%p calld=%p: resolver returned but no LB policy; "
   3009                   "wait_for_ready=true; trying again",
   3010                   chand, calld);
   3011         }
   3012         // Re-add ourselves to the waiting list.
   3013         self->AddToWaitingList();
   3014         // Return early so that we don't set finished_ to true below.
   3015         return;
   3016       } else {
   3017         if (grpc_client_channel_trace.enabled()) {
   3018           gpr_log(GPR_INFO,
   3019                   "chand=%p calld=%p: resolver returned but no LB policy; "
   3020                   "wait_for_ready=false; failing",
   3021                   chand, calld);
   3022         }
   3023         pick_done_locked(
   3024             elem,
   3025             grpc_error_set_int(
   3026                 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
   3027                 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
   3028       }
   3029     } else {
   3030       if (grpc_client_channel_trace.enabled()) {
   3031         gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
   3032                 chand, calld);
   3033       }
   3034       process_service_config_and_start_lb_pick_locked(elem);
   3035     }
   3036     self->finished_ = true;
   3037   }
   3038 
   3039   // Invoked when the call is cancelled.
   3040   // Note: This runs under the client_channel combiner, but will NOT be
   3041   // holding the call combiner.
   3042   static void CancelLocked(void* arg, grpc_error* error) {
   3043     ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
   3044     // If DoneLocked() has already run, delete ourselves without doing anything.
   3045     if (GPR_LIKELY(self->finished_)) {
   3046       Delete(self);
   3047       return;
   3048     }
   3049     // If we are being cancelled, immediately invoke pick_done_locked()
   3050     // to propagate the error back to the caller.
   3051     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
   3052       grpc_call_element* elem = self->elem_;
   3053       channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3054       call_data* calld = static_cast<call_data*>(elem->call_data);
   3055       if (grpc_client_channel_trace.enabled()) {
   3056         gpr_log(GPR_INFO,
   3057                 "chand=%p calld=%p: cancelling call waiting for name "
   3058                 "resolution",
   3059                 chand, calld);
   3060       }
   3061       // Note: Although we are not in the call combiner here, we are
   3062       // basically stealing the call combiner from the pending pick, so
   3063       // it's safe to call pick_done_locked() here -- we are essentially
   3064       // calling it here instead of calling it in DoneLocked().
   3065       pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
   3066                                  "Pick cancelled", &error, 1));
   3067     }
   3068     self->finished_ = true;
   3069   }
   3070 
   3071   grpc_call_element* elem_;
   3072   grpc_closure done_closure_;
   3073   grpc_closure cancel_closure_;
   3074   bool finished_ = false;
   3075 };
   3076 
   3077 }  // namespace grpc_core
   3078 
   3079 static void start_pick_locked(void* arg, grpc_error* ignored) {
   3080   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
   3081   call_data* calld = static_cast<call_data*>(elem->call_data);
   3082   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3083   GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
   3084   GPR_ASSERT(calld->subchannel_call == nullptr);
   3085   if (GPR_LIKELY(chand->lb_policy != nullptr)) {
   3086     // We already have resolver results, so process the service config
   3087     // and start an LB pick.
   3088     process_service_config_and_start_lb_pick_locked(elem);
   3089   } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
   3090     pick_done_locked(elem,
   3091                      GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
   3092   } else {
   3093     // We do not yet have an LB policy, so wait for a resolver result.
   3094     if (GPR_UNLIKELY(!chand->started_resolving)) {
   3095       start_resolving_locked(chand);
   3096     }
   3097     // Create a new waiter, which will delete itself when done.
   3098     grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
   3099     // Add the polling entity from call_data to the channel_data's
   3100     // interested_parties, so that the I/O of the resolver can be done
   3101     // under it.  It will be removed in pick_done_locked().
   3102     maybe_add_call_to_channel_interested_parties_locked(elem);
   3103   }
   3104 }
   3105 
   3106 //
   3107 // filter call vtable functions
   3108 //
   3109 
   3110 static void cc_start_transport_stream_op_batch(
   3111     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
   3112   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
   3113   call_data* calld = static_cast<call_data*>(elem->call_data);
   3114   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3115   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
   3116     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
   3117   }
   3118   // If we've previously been cancelled, immediately fail any new batches.
   3119   if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
   3120     if (grpc_client_channel_trace.enabled()) {
   3121       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
   3122               chand, calld, grpc_error_string(calld->cancel_error));
   3123     }
   3124     // Note: This will release the call combiner.
   3125     grpc_transport_stream_op_batch_finish_with_failure(
   3126         batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
   3127     return;
   3128   }
   3129   // Handle cancellation.
   3130   if (GPR_UNLIKELY(batch->cancel_stream)) {
   3131     // Stash a copy of cancel_error in our call data, so that we can use
   3132     // it for subsequent operations.  This ensures that if the call is
   3133     // cancelled before any batches are passed down (e.g., if the deadline
   3134     // is in the past when the call starts), we can return the right
   3135     // error to the caller when the first batch does get passed down.
   3136     GRPC_ERROR_UNREF(calld->cancel_error);
   3137     calld->cancel_error =
   3138         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
   3139     if (grpc_client_channel_trace.enabled()) {
   3140       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
   3141               calld, grpc_error_string(calld->cancel_error));
   3142     }
   3143     // If we do not have a subchannel call (i.e., a pick has not yet
   3144     // been started), fail all pending batches.  Otherwise, send the
   3145     // cancellation down to the subchannel call.
   3146     if (calld->subchannel_call == nullptr) {
   3147       pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
   3148                            false /* yield_call_combiner */);
   3149       // Note: This will release the call combiner.
   3150       grpc_transport_stream_op_batch_finish_with_failure(
   3151           batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
   3152     } else {
   3153       // Note: This will release the call combiner.
   3154       grpc_subchannel_call_process_op(calld->subchannel_call, batch);
   3155     }
   3156     return;
   3157   }
   3158   // Add the batch to the pending list.
   3159   pending_batches_add(elem, batch);
   3160   // Check if we've already gotten a subchannel call.
   3161   // Note that once we have completed the pick, we do not need to enter
   3162   // the channel combiner, which is more efficient (especially for
   3163   // streaming calls).
   3164   if (calld->subchannel_call != nullptr) {
   3165     if (grpc_client_channel_trace.enabled()) {
   3166       gpr_log(GPR_INFO,
   3167               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
   3168               calld, calld->subchannel_call);
   3169     }
   3170     pending_batches_resume(elem);
   3171     return;
   3172   }
   3173   // We do not yet have a subchannel call.
   3174   // For batches containing a send_initial_metadata op, enter the channel
   3175   // combiner to start a pick.
   3176   if (GPR_LIKELY(batch->send_initial_metadata)) {
   3177     if (grpc_client_channel_trace.enabled()) {
   3178       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
   3179               chand, calld);
   3180     }
   3181     GRPC_CLOSURE_SCHED(
   3182         GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
   3183                           elem, grpc_combiner_scheduler(chand->combiner)),
   3184         GRPC_ERROR_NONE);
   3185   } else {
   3186     // For all other batches, release the call combiner.
   3187     if (grpc_client_channel_trace.enabled()) {
   3188       gpr_log(GPR_INFO,
   3189               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
   3190               calld);
   3191     }
   3192     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
   3193                             "batch does not include send_initial_metadata");
   3194   }
   3195 }
   3196 
   3197 /* Constructor for call_data */
   3198 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
   3199                                      const grpc_call_element_args* args) {
   3200   call_data* calld = static_cast<call_data*>(elem->call_data);
   3201   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3202   // Initialize data members.
   3203   calld->path = grpc_slice_ref_internal(args->path);
   3204   calld->call_start_time = args->start_time;
   3205   calld->deadline = args->deadline;
   3206   calld->arena = args->arena;
   3207   calld->owning_call = args->call_stack;
   3208   calld->call_combiner = args->call_combiner;
   3209   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
   3210     grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
   3211                              calld->deadline);
   3212   }
   3213   calld->enable_retries = chand->enable_retries;
   3214   calld->send_messages.Init();
   3215   return GRPC_ERROR_NONE;
   3216 }
   3217 
   3218 /* Destructor for call_data */
   3219 static void cc_destroy_call_elem(grpc_call_element* elem,
   3220                                  const grpc_call_final_info* final_info,
   3221                                  grpc_closure* then_schedule_closure) {
   3222   call_data* calld = static_cast<call_data*>(elem->call_data);
   3223   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3224   if (GPR_LIKELY(chand->deadline_checking_enabled)) {
   3225     grpc_deadline_state_destroy(elem);
   3226   }
   3227   grpc_slice_unref_internal(calld->path);
   3228   calld->retry_throttle_data.reset();
   3229   calld->method_params.reset();
   3230   GRPC_ERROR_UNREF(calld->cancel_error);
   3231   if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
   3232     grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
   3233                                              then_schedule_closure);
   3234     then_schedule_closure = nullptr;
   3235     GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
   3236                                "client_channel_destroy_call");
   3237   }
   3238   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
   3239     GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
   3240   }
   3241   if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
   3242     calld->pick.connected_subchannel.reset();
   3243   }
   3244   for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
   3245     if (calld->pick.subchannel_call_context[i].value != nullptr) {
   3246       calld->pick.subchannel_call_context[i].destroy(
   3247           calld->pick.subchannel_call_context[i].value);
   3248     }
   3249   }
   3250   calld->send_messages.Destroy();
   3251   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
   3252 }
   3253 
   3254 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
   3255                                           grpc_polling_entity* pollent) {
   3256   call_data* calld = static_cast<call_data*>(elem->call_data);
   3257   calld->pollent = pollent;
   3258 }
   3259 
   3260 /*************************************************************************
   3261  * EXPORTED SYMBOLS
   3262  */
   3263 
   3264 const grpc_channel_filter grpc_client_channel_filter = {
   3265     cc_start_transport_stream_op_batch,
   3266     cc_start_transport_op,
   3267     sizeof(call_data),
   3268     cc_init_call_elem,
   3269     cc_set_pollset_or_pollset_set,
   3270     cc_destroy_call_elem,
   3271     sizeof(channel_data),
   3272     cc_init_channel_elem,
   3273     cc_destroy_channel_elem,
   3274     cc_get_channel_info,
   3275     "client-channel",
   3276 };
   3277 
   3278 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
   3279   channel_data* chand = static_cast<channel_data*>(arg);
   3280   if (chand->lb_policy != nullptr) {
   3281     chand->lb_policy->ExitIdleLocked();
   3282   } else {
   3283     chand->exit_idle_when_lb_policy_arrives = true;
   3284     if (!chand->started_resolving && chand->resolver != nullptr) {
   3285       start_resolving_locked(chand);
   3286     }
   3287   }
   3288   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
   3289 }
   3290 
   3291 void grpc_client_channel_populate_child_refs(
   3292     grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
   3293     grpc_core::ChildRefsList* child_channels) {
   3294   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3295   if (chand->lb_policy != nullptr) {
   3296     chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
   3297                                                child_channels);
   3298   }
   3299 }
   3300 
   3301 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
   3302     grpc_channel_element* elem, int try_to_connect) {
   3303   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3304   grpc_connectivity_state out =
   3305       grpc_connectivity_state_check(&chand->state_tracker);
   3306   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
   3307     GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
   3308     GRPC_CLOSURE_SCHED(
   3309         GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
   3310                             grpc_combiner_scheduler(chand->combiner)),
   3311         GRPC_ERROR_NONE);
   3312   }
   3313   return out;
   3314 }
   3315 
   3316 typedef struct external_connectivity_watcher {
   3317   channel_data* chand;
   3318   grpc_polling_entity pollent;
   3319   grpc_closure* on_complete;
   3320   grpc_closure* watcher_timer_init;
   3321   grpc_connectivity_state* state;
   3322   grpc_closure my_closure;
   3323   struct external_connectivity_watcher* next;
   3324 } external_connectivity_watcher;
   3325 
   3326 static external_connectivity_watcher* lookup_external_connectivity_watcher(
   3327     channel_data* chand, grpc_closure* on_complete) {
   3328   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
   3329   external_connectivity_watcher* w =
   3330       chand->external_connectivity_watcher_list_head;
   3331   while (w != nullptr && w->on_complete != on_complete) {
   3332     w = w->next;
   3333   }
   3334   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
   3335   return w;
   3336 }
   3337 
   3338 static void external_connectivity_watcher_list_append(
   3339     channel_data* chand, external_connectivity_watcher* w) {
   3340   GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
   3341 
   3342   gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
   3343   GPR_ASSERT(!w->next);
   3344   w->next = chand->external_connectivity_watcher_list_head;
   3345   chand->external_connectivity_watcher_list_head = w;
   3346   gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
   3347 }
   3348 
   3349 static void external_connectivity_watcher_list_remove(
   3350     channel_data* chand, external_connectivity_watcher* too_remove) {
   3351   GPR_ASSERT(
   3352       lookup_external_connectivity_watcher(chand, too_remove->on_complete));
   3353   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
   3354   if (too_remove == chand->external_connectivity_watcher_list_head) {
   3355     chand->external_connectivity_watcher_list_head = too_remove->next;
   3356     gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
   3357     return;
   3358   }
   3359   external_connectivity_watcher* w =
   3360       chand->external_connectivity_watcher_list_head;
   3361   while (w != nullptr) {
   3362     if (w->next == too_remove) {
   3363       w->next = w->next->next;
   3364       gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
   3365       return;
   3366     }
   3367     w = w->next;
   3368   }
   3369   GPR_UNREACHABLE_CODE(return );
   3370 }
   3371 
   3372 int grpc_client_channel_num_external_connectivity_watchers(
   3373     grpc_channel_element* elem) {
   3374   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3375   int count = 0;
   3376 
   3377   gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
   3378   external_connectivity_watcher* w =
   3379       chand->external_connectivity_watcher_list_head;
   3380   while (w != nullptr) {
   3381     count++;
   3382     w = w->next;
   3383   }
   3384   gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
   3385 
   3386   return count;
   3387 }
   3388 
   3389 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
   3390   external_connectivity_watcher* w =
   3391       static_cast<external_connectivity_watcher*>(arg);
   3392   grpc_closure* follow_up = w->on_complete;
   3393   grpc_polling_entity_del_from_pollset_set(&w->pollent,
   3394                                            w->chand->interested_parties);
   3395   GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
   3396                            "external_connectivity_watcher");
   3397   external_connectivity_watcher_list_remove(w->chand, w);
   3398   gpr_free(w);
   3399   GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
   3400 }
   3401 
   3402 static void watch_connectivity_state_locked(void* arg,
   3403                                             grpc_error* error_ignored) {
   3404   external_connectivity_watcher* w =
   3405       static_cast<external_connectivity_watcher*>(arg);
   3406   external_connectivity_watcher* found = nullptr;
   3407   if (w->state != nullptr) {
   3408     external_connectivity_watcher_list_append(w->chand, w);
   3409     // An assumption is being made that the closure is scheduled on the exec ctx
   3410     // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
   3411     GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
   3412     GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
   3413                       grpc_combiner_scheduler(w->chand->combiner));
   3414     grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
   3415                                                    w->state, &w->my_closure);
   3416   } else {
   3417     GPR_ASSERT(w->watcher_timer_init == nullptr);
   3418     found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
   3419     if (found) {
   3420       GPR_ASSERT(found->on_complete == w->on_complete);
   3421       grpc_connectivity_state_notify_on_state_change(
   3422           &found->chand->state_tracker, nullptr, &found->my_closure);
   3423     }
   3424     grpc_polling_entity_del_from_pollset_set(&w->pollent,
   3425                                              w->chand->interested_parties);
   3426     GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
   3427                              "external_connectivity_watcher");
   3428     gpr_free(w);
   3429   }
   3430 }
   3431 
   3432 void grpc_client_channel_watch_connectivity_state(
   3433     grpc_channel_element* elem, grpc_polling_entity pollent,
   3434     grpc_connectivity_state* state, grpc_closure* closure,
   3435     grpc_closure* watcher_timer_init) {
   3436   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   3437   external_connectivity_watcher* w =
   3438       static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
   3439   w->chand = chand;
   3440   w->pollent = pollent;
   3441   w->on_complete = closure;
   3442   w->state = state;
   3443   w->watcher_timer_init = watcher_timer_init;
   3444   grpc_polling_entity_add_to_pollset_set(&w->pollent,
   3445                                          chand->interested_parties);
   3446   GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
   3447                          "external_connectivity_watcher");
   3448   GRPC_CLOSURE_SCHED(
   3449       GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
   3450                         grpc_combiner_scheduler(chand->combiner)),
   3451       GRPC_ERROR_NONE);
   3452 }
   3453 
   3454 grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
   3455     grpc_call_element* elem) {
   3456   call_data* calld = static_cast<call_data*>(elem->call_data);
   3457   return calld->subchannel_call;
   3458 }
   3459