1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/ext/filters/client_channel/client_channel.h" 22 23 #include <inttypes.h> 24 #include <limits.h> 25 #include <stdbool.h> 26 #include <stdio.h> 27 #include <string.h> 28 29 #include <grpc/support/alloc.h> 30 #include <grpc/support/log.h> 31 #include <grpc/support/string_util.h> 32 #include <grpc/support/sync.h> 33 34 #include "src/core/ext/filters/client_channel/backup_poller.h" 35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" 36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h" 37 #include "src/core/ext/filters/client_channel/method_params.h" 38 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" 39 #include "src/core/ext/filters/client_channel/resolver_registry.h" 40 #include "src/core/ext/filters/client_channel/retry_throttle.h" 41 #include "src/core/ext/filters/client_channel/subchannel.h" 42 #include "src/core/ext/filters/deadline/deadline_filter.h" 43 #include "src/core/lib/backoff/backoff.h" 44 #include "src/core/lib/channel/channel_args.h" 45 #include "src/core/lib/channel/connected_channel.h" 46 #include "src/core/lib/channel/status_util.h" 47 #include "src/core/lib/gpr/string.h" 48 #include "src/core/lib/gprpp/inlined_vector.h" 49 #include "src/core/lib/gprpp/manual_constructor.h" 50 #include "src/core/lib/iomgr/combiner.h" 51 #include "src/core/lib/iomgr/iomgr.h" 52 #include "src/core/lib/iomgr/polling_entity.h" 53 #include "src/core/lib/profiling/timers.h" 54 #include "src/core/lib/slice/slice_internal.h" 55 #include "src/core/lib/slice/slice_string_helpers.h" 56 #include "src/core/lib/surface/channel.h" 57 #include "src/core/lib/transport/connectivity_state.h" 58 #include "src/core/lib/transport/error_utils.h" 59 #include "src/core/lib/transport/metadata.h" 60 #include "src/core/lib/transport/metadata_batch.h" 61 #include "src/core/lib/transport/service_config.h" 62 #include "src/core/lib/transport/static_metadata.h" 63 #include "src/core/lib/transport/status_metadata.h" 64 65 using grpc_core::internal::ClientChannelMethodParams; 66 using grpc_core::internal::ServerRetryThrottleData; 67 68 /* Client channel implementation */ 69 70 // By default, we buffer 256 KiB per RPC for retries. 71 // TODO(roth): Do we have any data to suggest a better value? 72 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10) 73 74 // This value was picked arbitrarily. It can be changed if there is 75 // any even moderately compelling reason to do so. 76 #define RETRY_BACKOFF_JITTER 0.2 77 78 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel"); 79 80 /************************************************************************* 81 * CHANNEL-WIDE FUNCTIONS 82 */ 83 84 struct external_connectivity_watcher; 85 86 typedef grpc_core::SliceHashTable< 87 grpc_core::RefCountedPtr<ClientChannelMethodParams>> 88 MethodParamsTable; 89 90 typedef struct client_channel_channel_data { 91 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver; 92 bool started_resolving; 93 bool deadline_checking_enabled; 94 grpc_client_channel_factory* client_channel_factory; 95 bool enable_retries; 96 size_t per_rpc_retry_buffer_size; 97 98 /** combiner protecting all variables below in this data structure */ 99 grpc_combiner* combiner; 100 /** currently active load balancer */ 101 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy; 102 /** retry throttle data */ 103 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; 104 /** maps method names to method_parameters structs */ 105 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table; 106 /** incoming resolver result - set by resolver.next() */ 107 grpc_channel_args* resolver_result; 108 /** a list of closures that are all waiting for resolver result to come in */ 109 grpc_closure_list waiting_for_resolver_result_closures; 110 /** resolver callback */ 111 grpc_closure on_resolver_result_changed; 112 /** connectivity state being tracked */ 113 grpc_connectivity_state_tracker state_tracker; 114 /** when an lb_policy arrives, should we try to exit idle */ 115 bool exit_idle_when_lb_policy_arrives; 116 /** owning stack */ 117 grpc_channel_stack* owning_stack; 118 /** interested parties (owned) */ 119 grpc_pollset_set* interested_parties; 120 121 /* external_connectivity_watcher_list head is guarded by its own mutex, since 122 * counts need to be grabbed immediately without polling on a cq */ 123 gpr_mu external_connectivity_watcher_list_mu; 124 struct external_connectivity_watcher* external_connectivity_watcher_list_head; 125 126 /* the following properties are guarded by a mutex since APIs require them 127 to be instantaneously available */ 128 gpr_mu info_mu; 129 grpc_core::UniquePtr<char> info_lb_policy_name; 130 /** service config in JSON form */ 131 grpc_core::UniquePtr<char> info_service_config_json; 132 } channel_data; 133 134 typedef struct { 135 channel_data* chand; 136 /** used as an identifier, don't dereference it because the LB policy may be 137 * non-existing when the callback is run */ 138 grpc_core::LoadBalancingPolicy* lb_policy; 139 grpc_closure closure; 140 } reresolution_request_args; 141 142 /** We create one watcher for each new lb_policy that is returned from a 143 resolver, to watch for state changes from the lb_policy. When a state 144 change is seen, we update the channel, and create a new watcher. */ 145 typedef struct { 146 channel_data* chand; 147 grpc_closure on_changed; 148 grpc_connectivity_state state; 149 grpc_core::LoadBalancingPolicy* lb_policy; 150 } lb_policy_connectivity_watcher; 151 152 static void watch_lb_policy_locked(channel_data* chand, 153 grpc_core::LoadBalancingPolicy* lb_policy, 154 grpc_connectivity_state current_state); 155 156 static void set_channel_connectivity_state_locked(channel_data* chand, 157 grpc_connectivity_state state, 158 grpc_error* error, 159 const char* reason) { 160 /* TODO: Improve failure handling: 161 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. 162 * - Hand over pending picks from old policies during the switch that happens 163 * when resolver provides an update. */ 164 if (chand->lb_policy != nullptr) { 165 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { 166 /* cancel picks with wait_for_ready=false */ 167 chand->lb_policy->CancelMatchingPicksLocked( 168 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, 169 /* check= */ 0, GRPC_ERROR_REF(error)); 170 } else if (state == GRPC_CHANNEL_SHUTDOWN) { 171 /* cancel all picks */ 172 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0, 173 GRPC_ERROR_REF(error)); 174 } 175 } 176 if (grpc_client_channel_trace.enabled()) { 177 gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand, 178 grpc_connectivity_state_name(state)); 179 } 180 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); 181 } 182 183 static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { 184 lb_policy_connectivity_watcher* w = 185 static_cast<lb_policy_connectivity_watcher*>(arg); 186 /* check if the notification is for the latest policy */ 187 if (w->lb_policy == w->chand->lb_policy.get()) { 188 if (grpc_client_channel_trace.enabled()) { 189 gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand, 190 w->lb_policy, grpc_connectivity_state_name(w->state)); 191 } 192 set_channel_connectivity_state_locked(w->chand, w->state, 193 GRPC_ERROR_REF(error), "lb_changed"); 194 if (w->state != GRPC_CHANNEL_SHUTDOWN) { 195 watch_lb_policy_locked(w->chand, w->lb_policy, w->state); 196 } 197 } 198 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy"); 199 gpr_free(w); 200 } 201 202 static void watch_lb_policy_locked(channel_data* chand, 203 grpc_core::LoadBalancingPolicy* lb_policy, 204 grpc_connectivity_state current_state) { 205 lb_policy_connectivity_watcher* w = 206 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w))); 207 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); 208 w->chand = chand; 209 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, 210 grpc_combiner_scheduler(chand->combiner)); 211 w->state = current_state; 212 w->lb_policy = lb_policy; 213 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed); 214 } 215 216 static void start_resolving_locked(channel_data* chand) { 217 if (grpc_client_channel_trace.enabled()) { 218 gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand); 219 } 220 GPR_ASSERT(!chand->started_resolving); 221 chand->started_resolving = true; 222 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); 223 chand->resolver->NextLocked(&chand->resolver_result, 224 &chand->on_resolver_result_changed); 225 } 226 227 typedef struct { 228 char* server_name; 229 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; 230 } service_config_parsing_state; 231 232 static void parse_retry_throttle_params( 233 const grpc_json* field, service_config_parsing_state* parsing_state) { 234 if (strcmp(field->key, "retryThrottling") == 0) { 235 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate. 236 if (field->type != GRPC_JSON_OBJECT) return; 237 int max_milli_tokens = 0; 238 int milli_token_ratio = 0; 239 for (grpc_json* sub_field = field->child; sub_field != nullptr; 240 sub_field = sub_field->next) { 241 if (sub_field->key == nullptr) return; 242 if (strcmp(sub_field->key, "maxTokens") == 0) { 243 if (max_milli_tokens != 0) return; // Duplicate. 244 if (sub_field->type != GRPC_JSON_NUMBER) return; 245 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value); 246 if (max_milli_tokens == -1) return; 247 max_milli_tokens *= 1000; 248 } else if (strcmp(sub_field->key, "tokenRatio") == 0) { 249 if (milli_token_ratio != 0) return; // Duplicate. 250 if (sub_field->type != GRPC_JSON_NUMBER) return; 251 // We support up to 3 decimal digits. 252 size_t whole_len = strlen(sub_field->value); 253 uint32_t multiplier = 1; 254 uint32_t decimal_value = 0; 255 const char* decimal_point = strchr(sub_field->value, '.'); 256 if (decimal_point != nullptr) { 257 whole_len = static_cast<size_t>(decimal_point - sub_field->value); 258 multiplier = 1000; 259 size_t decimal_len = strlen(decimal_point + 1); 260 if (decimal_len > 3) decimal_len = 3; 261 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len, 262 &decimal_value)) { 263 return; 264 } 265 uint32_t decimal_multiplier = 1; 266 for (size_t i = 0; i < (3 - decimal_len); ++i) { 267 decimal_multiplier *= 10; 268 } 269 decimal_value *= decimal_multiplier; 270 } 271 uint32_t whole_value; 272 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len, 273 &whole_value)) { 274 return; 275 } 276 milli_token_ratio = 277 static_cast<int>((whole_value * multiplier) + decimal_value); 278 if (milli_token_ratio <= 0) return; 279 } 280 } 281 parsing_state->retry_throttle_data = 282 grpc_core::internal::ServerRetryThrottleMap::GetDataForServer( 283 parsing_state->server_name, max_milli_tokens, milli_token_ratio); 284 } 285 } 286 287 // Invoked from the resolver NextLocked() callback when the resolver 288 // is shutting down. 289 static void on_resolver_shutdown_locked(channel_data* chand, 290 grpc_error* error) { 291 if (grpc_client_channel_trace.enabled()) { 292 gpr_log(GPR_INFO, "chand=%p: shutting down", chand); 293 } 294 if (chand->lb_policy != nullptr) { 295 if (grpc_client_channel_trace.enabled()) { 296 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, 297 chand->lb_policy.get()); 298 } 299 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), 300 chand->interested_parties); 301 chand->lb_policy.reset(); 302 } 303 if (chand->resolver != nullptr) { 304 // This should never happen; it can only be triggered by a resolver 305 // implementation spotaneously deciding to report shutdown without 306 // being orphaned. This code is included just to be defensive. 307 if (grpc_client_channel_trace.enabled()) { 308 gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p", 309 chand, chand->resolver.get()); 310 } 311 chand->resolver.reset(); 312 set_channel_connectivity_state_locked( 313 chand, GRPC_CHANNEL_SHUTDOWN, 314 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 315 "Resolver spontaneous shutdown", &error, 1), 316 "resolver_spontaneous_shutdown"); 317 } 318 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, 319 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 320 "Channel disconnected", &error, 1)); 321 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); 322 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver"); 323 grpc_channel_args_destroy(chand->resolver_result); 324 chand->resolver_result = nullptr; 325 GRPC_ERROR_UNREF(error); 326 } 327 328 // Returns the LB policy name from the resolver result. 329 static grpc_core::UniquePtr<char> 330 get_lb_policy_name_from_resolver_result_locked(channel_data* chand) { 331 // Find LB policy name in channel args. 332 const grpc_arg* channel_arg = 333 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); 334 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); 335 // Special case: If at least one balancer address is present, we use 336 // the grpclb policy, regardless of what the resolver actually specified. 337 channel_arg = 338 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); 339 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { 340 grpc_lb_addresses* addresses = 341 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); 342 if (grpc_lb_addresses_contains_balancer_address(*addresses)) { 343 if (lb_policy_name != nullptr && 344 gpr_stricmp(lb_policy_name, "grpclb") != 0) { 345 gpr_log(GPR_INFO, 346 "resolver requested LB policy %s but provided at least one " 347 "balancer address -- forcing use of grpclb LB policy", 348 lb_policy_name); 349 } 350 lb_policy_name = "grpclb"; 351 } 352 } 353 // Use pick_first if nothing was specified and we didn't select grpclb 354 // above. 355 if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; 356 return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name)); 357 } 358 359 static void request_reresolution_locked(void* arg, grpc_error* error) { 360 reresolution_request_args* args = 361 static_cast<reresolution_request_args*>(arg); 362 channel_data* chand = args->chand; 363 // If this invocation is for a stale LB policy, treat it as an LB shutdown 364 // signal. 365 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE || 366 chand->resolver == nullptr) { 367 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution"); 368 gpr_free(args); 369 return; 370 } 371 if (grpc_client_channel_trace.enabled()) { 372 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); 373 } 374 chand->resolver->RequestReresolutionLocked(); 375 // Give back the closure to the LB policy. 376 chand->lb_policy->SetReresolutionClosureLocked(&args->closure); 377 } 378 379 // Creates a new LB policy, replacing any previous one. 380 // If the new policy is created successfully, sets *connectivity_state and 381 // *connectivity_error to its initial connectivity state; otherwise, 382 // leaves them unchanged. 383 static void create_new_lb_policy_locked( 384 channel_data* chand, char* lb_policy_name, 385 grpc_connectivity_state* connectivity_state, 386 grpc_error** connectivity_error) { 387 grpc_core::LoadBalancingPolicy::Args lb_policy_args; 388 lb_policy_args.combiner = chand->combiner; 389 lb_policy_args.client_channel_factory = chand->client_channel_factory; 390 lb_policy_args.args = chand->resolver_result; 391 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy = 392 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( 393 lb_policy_name, lb_policy_args); 394 if (GPR_UNLIKELY(new_lb_policy == nullptr)) { 395 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); 396 } else { 397 if (grpc_client_channel_trace.enabled()) { 398 gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand, 399 lb_policy_name, new_lb_policy.get()); 400 } 401 // Swap out the LB policy and update the fds in 402 // chand->interested_parties. 403 if (chand->lb_policy != nullptr) { 404 if (grpc_client_channel_trace.enabled()) { 405 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand, 406 chand->lb_policy.get()); 407 } 408 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), 409 chand->interested_parties); 410 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get()); 411 } 412 chand->lb_policy = std::move(new_lb_policy); 413 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(), 414 chand->interested_parties); 415 // Set up re-resolution callback. 416 reresolution_request_args* args = 417 static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args))); 418 args->chand = chand; 419 args->lb_policy = chand->lb_policy.get(); 420 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, 421 grpc_combiner_scheduler(chand->combiner)); 422 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); 423 chand->lb_policy->SetReresolutionClosureLocked(&args->closure); 424 // Get the new LB policy's initial connectivity state and start a 425 // connectivity watch. 426 GRPC_ERROR_UNREF(*connectivity_error); 427 *connectivity_state = 428 chand->lb_policy->CheckConnectivityLocked(connectivity_error); 429 if (chand->exit_idle_when_lb_policy_arrives) { 430 chand->lb_policy->ExitIdleLocked(); 431 chand->exit_idle_when_lb_policy_arrives = false; 432 } 433 watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state); 434 } 435 } 436 437 // Returns the service config (as a JSON string) from the resolver result. 438 // Also updates state in chand. 439 static grpc_core::UniquePtr<char> 440 get_service_config_from_resolver_result_locked(channel_data* chand) { 441 const grpc_arg* channel_arg = 442 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); 443 const char* service_config_json = grpc_channel_arg_get_string(channel_arg); 444 if (service_config_json != nullptr) { 445 if (grpc_client_channel_trace.enabled()) { 446 gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", 447 chand, service_config_json); 448 } 449 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = 450 grpc_core::ServiceConfig::Create(service_config_json); 451 if (service_config != nullptr) { 452 if (chand->enable_retries) { 453 channel_arg = 454 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); 455 const char* server_uri = grpc_channel_arg_get_string(channel_arg); 456 GPR_ASSERT(server_uri != nullptr); 457 grpc_uri* uri = grpc_uri_parse(server_uri, true); 458 GPR_ASSERT(uri->path[0] != '\0'); 459 service_config_parsing_state parsing_state; 460 parsing_state.server_name = 461 uri->path[0] == '/' ? uri->path + 1 : uri->path; 462 service_config->ParseGlobalParams(parse_retry_throttle_params, 463 &parsing_state); 464 grpc_uri_destroy(uri); 465 chand->retry_throttle_data = 466 std::move(parsing_state.retry_throttle_data); 467 } 468 chand->method_params_table = service_config->CreateMethodConfigTable( 469 ClientChannelMethodParams::CreateFromJson); 470 } 471 } 472 return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json)); 473 } 474 475 // Callback invoked when a resolver result is available. 476 static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { 477 channel_data* chand = static_cast<channel_data*>(arg); 478 if (grpc_client_channel_trace.enabled()) { 479 const char* disposition = 480 chand->resolver_result != nullptr 481 ? "" 482 : (error == GRPC_ERROR_NONE ? " (transient error)" 483 : " (resolver shutdown)"); 484 gpr_log(GPR_INFO, 485 "chand=%p: got resolver result: resolver_result=%p error=%s%s", 486 chand, chand->resolver_result, grpc_error_string(error), 487 disposition); 488 } 489 // Handle shutdown. 490 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { 491 on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error)); 492 return; 493 } 494 // Data used to set the channel's connectivity state. 495 bool set_connectivity_state = true; 496 grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; 497 grpc_error* connectivity_error = 498 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); 499 // chand->resolver_result will be null in the case of a transient 500 // resolution error. In that case, we don't have any new result to 501 // process, which means that we keep using the previous result (if any). 502 if (chand->resolver_result == nullptr) { 503 if (grpc_client_channel_trace.enabled()) { 504 gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand); 505 } 506 } else { 507 grpc_core::UniquePtr<char> lb_policy_name = 508 get_lb_policy_name_from_resolver_result_locked(chand); 509 // Check to see if we're already using the right LB policy. 510 // Note: It's safe to use chand->info_lb_policy_name here without 511 // taking a lock on chand->info_mu, because this function is the 512 // only thing that modifies its value, and it can only be invoked 513 // once at any given time. 514 bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr || 515 gpr_stricmp(chand->info_lb_policy_name.get(), 516 lb_policy_name.get()) != 0; 517 if (chand->lb_policy != nullptr && !lb_policy_name_changed) { 518 // Continue using the same LB policy. Update with new addresses. 519 if (grpc_client_channel_trace.enabled()) { 520 gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)", 521 chand, lb_policy_name.get(), chand->lb_policy.get()); 522 } 523 chand->lb_policy->UpdateLocked(*chand->resolver_result); 524 // No need to set the channel's connectivity state; the existing 525 // watch on the LB policy will take care of that. 526 set_connectivity_state = false; 527 } else { 528 // Instantiate new LB policy. 529 create_new_lb_policy_locked(chand, lb_policy_name.get(), 530 &connectivity_state, &connectivity_error); 531 } 532 // Find service config. 533 grpc_core::UniquePtr<char> service_config_json = 534 get_service_config_from_resolver_result_locked(chand); 535 // Swap out the data used by cc_get_channel_info(). 536 gpr_mu_lock(&chand->info_mu); 537 chand->info_lb_policy_name = std::move(lb_policy_name); 538 chand->info_service_config_json = std::move(service_config_json); 539 gpr_mu_unlock(&chand->info_mu); 540 // Clean up. 541 grpc_channel_args_destroy(chand->resolver_result); 542 chand->resolver_result = nullptr; 543 } 544 // Set the channel's connectivity state if needed. 545 if (set_connectivity_state) { 546 set_channel_connectivity_state_locked( 547 chand, connectivity_state, connectivity_error, "resolver_result"); 548 } else { 549 GRPC_ERROR_UNREF(connectivity_error); 550 } 551 // Invoke closures that were waiting for results and renew the watch. 552 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); 553 chand->resolver->NextLocked(&chand->resolver_result, 554 &chand->on_resolver_result_changed); 555 } 556 557 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { 558 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg); 559 grpc_channel_element* elem = 560 static_cast<grpc_channel_element*>(op->handler_private.extra_arg); 561 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 562 563 if (op->on_connectivity_state_change != nullptr) { 564 grpc_connectivity_state_notify_on_state_change( 565 &chand->state_tracker, op->connectivity_state, 566 op->on_connectivity_state_change); 567 op->on_connectivity_state_change = nullptr; 568 op->connectivity_state = nullptr; 569 } 570 571 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { 572 if (chand->lb_policy == nullptr) { 573 grpc_error* error = 574 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"); 575 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); 576 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); 577 } else { 578 grpc_error* error = GRPC_ERROR_NONE; 579 grpc_core::LoadBalancingPolicy::PickState pick_state; 580 pick_state.initial_metadata = nullptr; 581 pick_state.initial_metadata_flags = 0; 582 pick_state.on_complete = nullptr; 583 memset(&pick_state.subchannel_call_context, 0, 584 sizeof(pick_state.subchannel_call_context)); 585 pick_state.user_data = nullptr; 586 // Pick must return synchronously, because pick_state.on_complete is null. 587 GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); 588 if (pick_state.connected_subchannel != nullptr) { 589 pick_state.connected_subchannel->Ping(op->send_ping.on_initiate, 590 op->send_ping.on_ack); 591 } else { 592 if (error == GRPC_ERROR_NONE) { 593 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( 594 "LB policy dropped call on ping"); 595 } 596 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); 597 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); 598 } 599 op->bind_pollset = nullptr; 600 } 601 op->send_ping.on_initiate = nullptr; 602 op->send_ping.on_ack = nullptr; 603 } 604 605 if (op->disconnect_with_error != GRPC_ERROR_NONE) { 606 if (chand->resolver != nullptr) { 607 set_channel_connectivity_state_locked( 608 chand, GRPC_CHANNEL_SHUTDOWN, 609 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); 610 chand->resolver.reset(); 611 if (!chand->started_resolving) { 612 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, 613 GRPC_ERROR_REF(op->disconnect_with_error)); 614 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); 615 } 616 if (chand->lb_policy != nullptr) { 617 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), 618 chand->interested_parties); 619 chand->lb_policy.reset(); 620 } 621 } 622 GRPC_ERROR_UNREF(op->disconnect_with_error); 623 } 624 625 if (op->reset_connect_backoff) { 626 if (chand->resolver != nullptr) { 627 chand->resolver->ResetBackoffLocked(); 628 chand->resolver->RequestReresolutionLocked(); 629 } 630 if (chand->lb_policy != nullptr) { 631 chand->lb_policy->ResetBackoffLocked(); 632 } 633 } 634 635 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); 636 637 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); 638 } 639 640 static void cc_start_transport_op(grpc_channel_element* elem, 641 grpc_transport_op* op) { 642 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 643 644 GPR_ASSERT(op->set_accept_stream == false); 645 if (op->bind_pollset != nullptr) { 646 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset); 647 } 648 649 op->handler_private.extra_arg = elem; 650 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); 651 GRPC_CLOSURE_SCHED( 652 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, 653 op, grpc_combiner_scheduler(chand->combiner)), 654 GRPC_ERROR_NONE); 655 } 656 657 static void cc_get_channel_info(grpc_channel_element* elem, 658 const grpc_channel_info* info) { 659 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 660 gpr_mu_lock(&chand->info_mu); 661 if (info->lb_policy_name != nullptr) { 662 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get()); 663 } 664 if (info->service_config_json != nullptr) { 665 *info->service_config_json = 666 gpr_strdup(chand->info_service_config_json.get()); 667 } 668 gpr_mu_unlock(&chand->info_mu); 669 } 670 671 /* Constructor for channel_data */ 672 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, 673 grpc_channel_element_args* args) { 674 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 675 GPR_ASSERT(args->is_last); 676 GPR_ASSERT(elem->filter == &grpc_client_channel_filter); 677 // Initialize data members. 678 chand->combiner = grpc_combiner_create(); 679 gpr_mu_init(&chand->info_mu); 680 gpr_mu_init(&chand->external_connectivity_watcher_list_mu); 681 682 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); 683 chand->external_connectivity_watcher_list_head = nullptr; 684 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); 685 686 chand->owning_stack = args->channel_stack; 687 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, 688 on_resolver_result_changed_locked, chand, 689 grpc_combiner_scheduler(chand->combiner)); 690 chand->interested_parties = grpc_pollset_set_create(); 691 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, 692 "client_channel"); 693 grpc_client_channel_start_backup_polling(chand->interested_parties); 694 // Record max per-RPC retry buffer size. 695 const grpc_arg* arg = grpc_channel_args_find( 696 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE); 697 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer( 698 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}); 699 // Record enable_retries. 700 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES); 701 chand->enable_retries = grpc_channel_arg_get_bool(arg, true); 702 // Record client channel factory. 703 arg = grpc_channel_args_find(args->channel_args, 704 GRPC_ARG_CLIENT_CHANNEL_FACTORY); 705 if (arg == nullptr) { 706 return GRPC_ERROR_CREATE_FROM_STATIC_STRING( 707 "Missing client channel factory in args for client channel filter"); 708 } 709 if (arg->type != GRPC_ARG_POINTER) { 710 return GRPC_ERROR_CREATE_FROM_STATIC_STRING( 711 "client channel factory arg must be a pointer"); 712 } 713 grpc_client_channel_factory_ref( 714 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p)); 715 chand->client_channel_factory = 716 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p); 717 // Get server name to resolve, using proxy mapper if needed. 718 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); 719 if (arg == nullptr) { 720 return GRPC_ERROR_CREATE_FROM_STATIC_STRING( 721 "Missing server uri in args for client channel filter"); 722 } 723 if (arg->type != GRPC_ARG_STRING) { 724 return GRPC_ERROR_CREATE_FROM_STATIC_STRING( 725 "server uri arg must be a string"); 726 } 727 char* proxy_name = nullptr; 728 grpc_channel_args* new_args = nullptr; 729 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, 730 &proxy_name, &new_args); 731 // Instantiate resolver. 732 chand->resolver = grpc_core::ResolverRegistry::CreateResolver( 733 proxy_name != nullptr ? proxy_name : arg->value.string, 734 new_args != nullptr ? new_args : args->channel_args, 735 chand->interested_parties, chand->combiner); 736 if (proxy_name != nullptr) gpr_free(proxy_name); 737 if (new_args != nullptr) grpc_channel_args_destroy(new_args); 738 if (chand->resolver == nullptr) { 739 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed"); 740 } 741 chand->deadline_checking_enabled = 742 grpc_deadline_checking_enabled(args->channel_args); 743 return GRPC_ERROR_NONE; 744 } 745 746 /* Destructor for channel_data */ 747 static void cc_destroy_channel_elem(grpc_channel_element* elem) { 748 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 749 if (chand->resolver != nullptr) { 750 // The only way we can get here is if we never started resolving, 751 // because we take a ref to the channel stack when we start 752 // resolving and do not release it until the resolver callback is 753 // invoked after the resolver shuts down. 754 chand->resolver.reset(); 755 } 756 if (chand->client_channel_factory != nullptr) { 757 grpc_client_channel_factory_unref(chand->client_channel_factory); 758 } 759 if (chand->lb_policy != nullptr) { 760 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(), 761 chand->interested_parties); 762 chand->lb_policy.reset(); 763 } 764 // TODO(roth): Once we convert the filter API to C++, there will no 765 // longer be any need to explicitly reset these smart pointer data members. 766 chand->info_lb_policy_name.reset(); 767 chand->info_service_config_json.reset(); 768 chand->retry_throttle_data.reset(); 769 chand->method_params_table.reset(); 770 grpc_client_channel_stop_backup_polling(chand->interested_parties); 771 grpc_connectivity_state_destroy(&chand->state_tracker); 772 grpc_pollset_set_destroy(chand->interested_parties); 773 GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); 774 gpr_mu_destroy(&chand->info_mu); 775 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); 776 } 777 778 /************************************************************************* 779 * PER-CALL FUNCTIONS 780 */ 781 782 // Max number of batches that can be pending on a call at any given 783 // time. This includes one batch for each of the following ops: 784 // recv_initial_metadata 785 // send_initial_metadata 786 // recv_message 787 // send_message 788 // recv_trailing_metadata 789 // send_trailing_metadata 790 #define MAX_PENDING_BATCHES 6 791 792 // Retry support: 793 // 794 // In order to support retries, we act as a proxy for stream op batches. 795 // When we get a batch from the surface, we add it to our list of pending 796 // batches, and we then use those batches to construct separate "child" 797 // batches to be started on the subchannel call. When the child batches 798 // return, we then decide which pending batches have been completed and 799 // schedule their callbacks accordingly. If a subchannel call fails and 800 // we want to retry it, we do a new pick and start again, constructing 801 // new "child" batches for the new subchannel call. 802 // 803 // Note that retries are committed when receiving data from the server 804 // (except for Trailers-Only responses). However, there may be many 805 // send ops started before receiving any data, so we may have already 806 // completed some number of send ops (and returned the completions up to 807 // the surface) by the time we realize that we need to retry. To deal 808 // with this, we cache data for send ops, so that we can replay them on a 809 // different subchannel call even after we have completed the original 810 // batches. 811 // 812 // There are two sets of data to maintain: 813 // - In call_data (in the parent channel), we maintain a list of pending 814 // ops and cached data for send ops. 815 // - In the subchannel call, we maintain state to indicate what ops have 816 // already been sent down to that call. 817 // 818 // When constructing the "child" batches, we compare those two sets of 819 // data to see which batches need to be sent to the subchannel call. 820 821 // TODO(roth): In subsequent PRs: 822 // - add support for transparent retries (including initial metadata) 823 // - figure out how to record stats in census for retries 824 // (census filter is on top of this one) 825 // - add census stats for retries 826 827 // State used for starting a retryable batch on a subchannel call. 828 // This provides its own grpc_transport_stream_op_batch and other data 829 // structures needed to populate the ops in the batch. 830 // We allocate one struct on the arena for each attempt at starting a 831 // batch on a given subchannel call. 832 typedef struct { 833 gpr_refcount refs; 834 grpc_call_element* elem; 835 grpc_subchannel_call* subchannel_call; // Holds a ref. 836 // The batch to use in the subchannel call. 837 // Its payload field points to subchannel_call_retry_state.batch_payload. 838 grpc_transport_stream_op_batch batch; 839 // For intercepting on_complete. 840 grpc_closure on_complete; 841 } subchannel_batch_data; 842 843 // Retry state associated with a subchannel call. 844 // Stored in the parent_data of the subchannel call object. 845 typedef struct { 846 // subchannel_batch_data.batch.payload points to this. 847 grpc_transport_stream_op_batch_payload batch_payload; 848 // For send_initial_metadata. 849 // Note that we need to make a copy of the initial metadata for each 850 // subchannel call instead of just referring to the copy in call_data, 851 // because filters in the subchannel stack will probably add entries, 852 // so we need to start in a pristine state for each attempt of the call. 853 grpc_linked_mdelem* send_initial_metadata_storage; 854 grpc_metadata_batch send_initial_metadata; 855 // For send_message. 856 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream> 857 send_message; 858 // For send_trailing_metadata. 859 grpc_linked_mdelem* send_trailing_metadata_storage; 860 grpc_metadata_batch send_trailing_metadata; 861 // For intercepting recv_initial_metadata. 862 grpc_metadata_batch recv_initial_metadata; 863 grpc_closure recv_initial_metadata_ready; 864 bool trailing_metadata_available; 865 // For intercepting recv_message. 866 grpc_closure recv_message_ready; 867 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message; 868 // For intercepting recv_trailing_metadata. 869 grpc_metadata_batch recv_trailing_metadata; 870 grpc_transport_stream_stats collect_stats; 871 grpc_closure recv_trailing_metadata_ready; 872 // These fields indicate which ops have been started and completed on 873 // this subchannel call. 874 size_t started_send_message_count; 875 size_t completed_send_message_count; 876 size_t started_recv_message_count; 877 size_t completed_recv_message_count; 878 bool started_send_initial_metadata : 1; 879 bool completed_send_initial_metadata : 1; 880 bool started_send_trailing_metadata : 1; 881 bool completed_send_trailing_metadata : 1; 882 bool started_recv_initial_metadata : 1; 883 bool completed_recv_initial_metadata : 1; 884 bool started_recv_trailing_metadata : 1; 885 bool completed_recv_trailing_metadata : 1; 886 // State for callback processing. 887 bool retry_dispatched : 1; 888 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch; 889 grpc_error* recv_initial_metadata_error; 890 subchannel_batch_data* recv_message_ready_deferred_batch; 891 grpc_error* recv_message_error; 892 subchannel_batch_data* recv_trailing_metadata_internal_batch; 893 } subchannel_call_retry_state; 894 895 // Pending batches stored in call data. 896 typedef struct { 897 // The pending batch. If nullptr, this slot is empty. 898 grpc_transport_stream_op_batch* batch; 899 // Indicates whether payload for send ops has been cached in call data. 900 bool send_ops_cached; 901 } pending_batch; 902 903 /** Call data. Holds a pointer to grpc_subchannel_call and the 904 associated machinery to create such a pointer. 905 Handles queueing of stream ops until a call object is ready, waiting 906 for initial metadata before trying to create a call object, 907 and handling cancellation gracefully. */ 908 typedef struct client_channel_call_data { 909 // State for handling deadlines. 910 // The code in deadline_filter.c requires this to be the first field. 911 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state 912 // and this struct both independently store pointers to the call stack 913 // and call combiner. If/when we have time, find a way to avoid this 914 // without breaking the grpc_deadline_state abstraction. 915 grpc_deadline_state deadline_state; 916 917 grpc_slice path; // Request path. 918 gpr_timespec call_start_time; 919 grpc_millis deadline; 920 gpr_arena* arena; 921 grpc_call_stack* owning_call; 922 grpc_call_combiner* call_combiner; 923 924 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; 925 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; 926 927 grpc_subchannel_call* subchannel_call; 928 929 // Set when we get a cancel_stream op. 930 grpc_error* cancel_error; 931 932 grpc_core::LoadBalancingPolicy::PickState pick; 933 grpc_closure pick_closure; 934 grpc_closure pick_cancel_closure; 935 936 // state needed to support channelz interception of recv trailing metadata. 937 grpc_closure recv_trailing_metadata_ready_channelz; 938 grpc_closure* original_recv_trailing_metadata; 939 grpc_metadata_batch* recv_trailing_metadata; 940 941 grpc_polling_entity* pollent; 942 bool pollent_added_to_interested_parties; 943 944 // Batches are added to this list when received from above. 945 // They are removed when we are done handling the batch (i.e., when 946 // either we have invoked all of the batch's callbacks or we have 947 // passed the batch down to the subchannel call and are not 948 // intercepting any of its callbacks). 949 pending_batch pending_batches[MAX_PENDING_BATCHES]; 950 bool pending_send_initial_metadata : 1; 951 bool pending_send_message : 1; 952 bool pending_send_trailing_metadata : 1; 953 954 // Retry state. 955 bool enable_retries : 1; 956 bool retry_committed : 1; 957 bool last_attempt_got_server_pushback : 1; 958 int num_attempts_completed; 959 size_t bytes_buffered_for_retry; 960 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff; 961 grpc_timer retry_timer; 962 963 // The number of pending retriable subchannel batches containing send ops. 964 // We hold a ref to the call stack while this is non-zero, since replay 965 // batches may not complete until after all callbacks have been returned 966 // to the surface, and we need to make sure that the call is not destroyed 967 // until all of these batches have completed. 968 // Note that we actually only need to track replay batches, but it's 969 // easier to track all batches with send ops. 970 int num_pending_retriable_subchannel_send_batches; 971 972 // Cached data for retrying send ops. 973 // send_initial_metadata 974 bool seen_send_initial_metadata; 975 grpc_linked_mdelem* send_initial_metadata_storage; 976 grpc_metadata_batch send_initial_metadata; 977 uint32_t send_initial_metadata_flags; 978 gpr_atm* peer_string; 979 // send_message 980 // When we get a send_message op, we replace the original byte stream 981 // with a CachingByteStream that caches the slices to a local buffer for 982 // use in retries. 983 // Note: We inline the cache for the first 3 send_message ops and use 984 // dynamic allocation after that. This number was essentially picked 985 // at random; it could be changed in the future to tune performance. 986 grpc_core::ManualConstructor< 987 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> 988 send_messages; 989 // send_trailing_metadata 990 bool seen_send_trailing_metadata; 991 grpc_linked_mdelem* send_trailing_metadata_storage; 992 grpc_metadata_batch send_trailing_metadata; 993 } call_data; 994 995 // Forward declarations. 996 static void retry_commit(grpc_call_element* elem, 997 subchannel_call_retry_state* retry_state); 998 static void start_internal_recv_trailing_metadata(grpc_call_element* elem); 999 static void on_complete(void* arg, grpc_error* error); 1000 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); 1001 static void start_pick_locked(void* arg, grpc_error* ignored); 1002 static void maybe_intercept_recv_trailing_metadata_for_channelz( 1003 grpc_call_element* elem, grpc_transport_stream_op_batch* batch); 1004 1005 // 1006 // send op data caching 1007 // 1008 1009 // Caches data for send ops so that it can be retried later, if not 1010 // already cached. 1011 static void maybe_cache_send_ops_for_batch(call_data* calld, 1012 pending_batch* pending) { 1013 if (pending->send_ops_cached) return; 1014 pending->send_ops_cached = true; 1015 grpc_transport_stream_op_batch* batch = pending->batch; 1016 // Save a copy of metadata for send_initial_metadata ops. 1017 if (batch->send_initial_metadata) { 1018 calld->seen_send_initial_metadata = true; 1019 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr); 1020 grpc_metadata_batch* send_initial_metadata = 1021 batch->payload->send_initial_metadata.send_initial_metadata; 1022 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc( 1023 calld->arena, 1024 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count); 1025 grpc_metadata_batch_copy(send_initial_metadata, 1026 &calld->send_initial_metadata, 1027 calld->send_initial_metadata_storage); 1028 calld->send_initial_metadata_flags = 1029 batch->payload->send_initial_metadata.send_initial_metadata_flags; 1030 calld->peer_string = batch->payload->send_initial_metadata.peer_string; 1031 } 1032 // Set up cache for send_message ops. 1033 if (batch->send_message) { 1034 grpc_core::ByteStreamCache* cache = 1035 static_cast<grpc_core::ByteStreamCache*>( 1036 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); 1037 new (cache) grpc_core::ByteStreamCache( 1038 std::move(batch->payload->send_message.send_message)); 1039 calld->send_messages->push_back(cache); 1040 } 1041 // Save metadata batch for send_trailing_metadata ops. 1042 if (batch->send_trailing_metadata) { 1043 calld->seen_send_trailing_metadata = true; 1044 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr); 1045 grpc_metadata_batch* send_trailing_metadata = 1046 batch->payload->send_trailing_metadata.send_trailing_metadata; 1047 calld->send_trailing_metadata_storage = 1048 (grpc_linked_mdelem*)gpr_arena_alloc( 1049 calld->arena, 1050 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count); 1051 grpc_metadata_batch_copy(send_trailing_metadata, 1052 &calld->send_trailing_metadata, 1053 calld->send_trailing_metadata_storage); 1054 } 1055 } 1056 1057 // Frees cached send_initial_metadata. 1058 static void free_cached_send_initial_metadata(channel_data* chand, 1059 call_data* calld) { 1060 if (grpc_client_channel_trace.enabled()) { 1061 gpr_log(GPR_INFO, 1062 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand, 1063 calld); 1064 } 1065 grpc_metadata_batch_destroy(&calld->send_initial_metadata); 1066 } 1067 1068 // Frees cached send_message at index idx. 1069 static void free_cached_send_message(channel_data* chand, call_data* calld, 1070 size_t idx) { 1071 if (grpc_client_channel_trace.enabled()) { 1072 gpr_log(GPR_INFO, 1073 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]", 1074 chand, calld, idx); 1075 } 1076 (*calld->send_messages)[idx]->Destroy(); 1077 } 1078 1079 // Frees cached send_trailing_metadata. 1080 static void free_cached_send_trailing_metadata(channel_data* chand, 1081 call_data* calld) { 1082 if (grpc_client_channel_trace.enabled()) { 1083 gpr_log(GPR_INFO, 1084 "chand=%p calld=%p: destroying calld->send_trailing_metadata", 1085 chand, calld); 1086 } 1087 grpc_metadata_batch_destroy(&calld->send_trailing_metadata); 1088 } 1089 1090 // Frees cached send ops that have already been completed after 1091 // committing the call. 1092 static void free_cached_send_op_data_after_commit( 1093 grpc_call_element* elem, subchannel_call_retry_state* retry_state) { 1094 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1095 call_data* calld = static_cast<call_data*>(elem->call_data); 1096 if (retry_state->completed_send_initial_metadata) { 1097 free_cached_send_initial_metadata(chand, calld); 1098 } 1099 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) { 1100 free_cached_send_message(chand, calld, i); 1101 } 1102 if (retry_state->completed_send_trailing_metadata) { 1103 free_cached_send_trailing_metadata(chand, calld); 1104 } 1105 } 1106 1107 // Frees cached send ops that were completed by the completed batch in 1108 // batch_data. Used when batches are completed after the call is committed. 1109 static void free_cached_send_op_data_for_completed_batch( 1110 grpc_call_element* elem, subchannel_batch_data* batch_data, 1111 subchannel_call_retry_state* retry_state) { 1112 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1113 call_data* calld = static_cast<call_data*>(elem->call_data); 1114 if (batch_data->batch.send_initial_metadata) { 1115 free_cached_send_initial_metadata(chand, calld); 1116 } 1117 if (batch_data->batch.send_message) { 1118 free_cached_send_message(chand, calld, 1119 retry_state->completed_send_message_count - 1); 1120 } 1121 if (batch_data->batch.send_trailing_metadata) { 1122 free_cached_send_trailing_metadata(chand, calld); 1123 } 1124 } 1125 1126 // 1127 // pending_batches management 1128 // 1129 1130 // Returns the index into calld->pending_batches to be used for batch. 1131 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) { 1132 // Note: It is important the send_initial_metadata be the first entry 1133 // here, since the code in pick_subchannel_locked() assumes it will be. 1134 if (batch->send_initial_metadata) return 0; 1135 if (batch->send_message) return 1; 1136 if (batch->send_trailing_metadata) return 2; 1137 if (batch->recv_initial_metadata) return 3; 1138 if (batch->recv_message) return 4; 1139 if (batch->recv_trailing_metadata) return 5; 1140 GPR_UNREACHABLE_CODE(return (size_t)-1); 1141 } 1142 1143 // This is called via the call combiner, so access to calld is synchronized. 1144 static void pending_batches_add(grpc_call_element* elem, 1145 grpc_transport_stream_op_batch* batch) { 1146 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1147 call_data* calld = static_cast<call_data*>(elem->call_data); 1148 const size_t idx = get_batch_index(batch); 1149 if (grpc_client_channel_trace.enabled()) { 1150 gpr_log(GPR_INFO, 1151 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand, 1152 calld, idx); 1153 } 1154 pending_batch* pending = &calld->pending_batches[idx]; 1155 GPR_ASSERT(pending->batch == nullptr); 1156 pending->batch = batch; 1157 pending->send_ops_cached = false; 1158 if (calld->enable_retries) { 1159 // Update state in calld about pending batches. 1160 // Also check if the batch takes us over the retry buffer limit. 1161 // Note: We don't check the size of trailing metadata here, because 1162 // gRPC clients do not send trailing metadata. 1163 if (batch->send_initial_metadata) { 1164 calld->pending_send_initial_metadata = true; 1165 calld->bytes_buffered_for_retry += grpc_metadata_batch_size( 1166 batch->payload->send_initial_metadata.send_initial_metadata); 1167 } 1168 if (batch->send_message) { 1169 calld->pending_send_message = true; 1170 calld->bytes_buffered_for_retry += 1171 batch->payload->send_message.send_message->length(); 1172 } 1173 if (batch->send_trailing_metadata) { 1174 calld->pending_send_trailing_metadata = true; 1175 } 1176 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry > 1177 chand->per_rpc_retry_buffer_size)) { 1178 if (grpc_client_channel_trace.enabled()) { 1179 gpr_log(GPR_INFO, 1180 "chand=%p calld=%p: exceeded retry buffer size, committing", 1181 chand, calld); 1182 } 1183 subchannel_call_retry_state* retry_state = 1184 calld->subchannel_call == nullptr 1185 ? nullptr 1186 : static_cast<subchannel_call_retry_state*>( 1187 grpc_connected_subchannel_call_get_parent_data( 1188 calld->subchannel_call)); 1189 retry_commit(elem, retry_state); 1190 // If we are not going to retry and have not yet started, pretend 1191 // retries are disabled so that we don't bother with retry overhead. 1192 if (calld->num_attempts_completed == 0) { 1193 if (grpc_client_channel_trace.enabled()) { 1194 gpr_log(GPR_INFO, 1195 "chand=%p calld=%p: disabling retries before first attempt", 1196 chand, calld); 1197 } 1198 calld->enable_retries = false; 1199 } 1200 } 1201 } 1202 } 1203 1204 static void pending_batch_clear(call_data* calld, pending_batch* pending) { 1205 if (calld->enable_retries) { 1206 if (pending->batch->send_initial_metadata) { 1207 calld->pending_send_initial_metadata = false; 1208 } 1209 if (pending->batch->send_message) { 1210 calld->pending_send_message = false; 1211 } 1212 if (pending->batch->send_trailing_metadata) { 1213 calld->pending_send_trailing_metadata = false; 1214 } 1215 } 1216 pending->batch = nullptr; 1217 } 1218 1219 // This is called via the call combiner, so access to calld is synchronized. 1220 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { 1221 grpc_transport_stream_op_batch* batch = 1222 static_cast<grpc_transport_stream_op_batch*>(arg); 1223 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg); 1224 // Note: This will release the call combiner. 1225 grpc_transport_stream_op_batch_finish_with_failure( 1226 batch, GRPC_ERROR_REF(error), calld->call_combiner); 1227 } 1228 1229 // This is called via the call combiner, so access to calld is synchronized. 1230 // If yield_call_combiner is true, assumes responsibility for yielding 1231 // the call combiner. 1232 static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, 1233 bool yield_call_combiner) { 1234 GPR_ASSERT(error != GRPC_ERROR_NONE); 1235 call_data* calld = static_cast<call_data*>(elem->call_data); 1236 if (grpc_client_channel_trace.enabled()) { 1237 size_t num_batches = 0; 1238 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 1239 if (calld->pending_batches[i].batch != nullptr) ++num_batches; 1240 } 1241 gpr_log(GPR_INFO, 1242 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", 1243 elem->channel_data, calld, num_batches, grpc_error_string(error)); 1244 } 1245 grpc_core::CallCombinerClosureList closures; 1246 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 1247 pending_batch* pending = &calld->pending_batches[i]; 1248 grpc_transport_stream_op_batch* batch = pending->batch; 1249 if (batch != nullptr) { 1250 batch->handler_private.extra_arg = calld; 1251 GRPC_CLOSURE_INIT(&batch->handler_private.closure, 1252 fail_pending_batch_in_call_combiner, batch, 1253 grpc_schedule_on_exec_ctx); 1254 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), 1255 "pending_batches_fail"); 1256 pending_batch_clear(calld, pending); 1257 } 1258 } 1259 if (yield_call_combiner) { 1260 closures.RunClosures(calld->call_combiner); 1261 } else { 1262 closures.RunClosuresWithoutYielding(calld->call_combiner); 1263 } 1264 GRPC_ERROR_UNREF(error); 1265 } 1266 1267 // This is called via the call combiner, so access to calld is synchronized. 1268 static void resume_pending_batch_in_call_combiner(void* arg, 1269 grpc_error* ignored) { 1270 grpc_transport_stream_op_batch* batch = 1271 static_cast<grpc_transport_stream_op_batch*>(arg); 1272 grpc_subchannel_call* subchannel_call = 1273 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); 1274 // Note: This will release the call combiner. 1275 grpc_subchannel_call_process_op(subchannel_call, batch); 1276 } 1277 1278 // This is called via the call combiner, so access to calld is synchronized. 1279 static void pending_batches_resume(grpc_call_element* elem) { 1280 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1281 call_data* calld = static_cast<call_data*>(elem->call_data); 1282 if (calld->enable_retries) { 1283 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE); 1284 return; 1285 } 1286 // Retries not enabled; send down batches as-is. 1287 if (grpc_client_channel_trace.enabled()) { 1288 size_t num_batches = 0; 1289 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 1290 if (calld->pending_batches[i].batch != nullptr) ++num_batches; 1291 } 1292 gpr_log(GPR_INFO, 1293 "chand=%p calld=%p: starting %" PRIuPTR 1294 " pending batches on subchannel_call=%p", 1295 chand, calld, num_batches, calld->subchannel_call); 1296 } 1297 grpc_core::CallCombinerClosureList closures; 1298 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 1299 pending_batch* pending = &calld->pending_batches[i]; 1300 grpc_transport_stream_op_batch* batch = pending->batch; 1301 if (batch != nullptr) { 1302 maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch); 1303 batch->handler_private.extra_arg = calld->subchannel_call; 1304 GRPC_CLOSURE_INIT(&batch->handler_private.closure, 1305 resume_pending_batch_in_call_combiner, batch, 1306 grpc_schedule_on_exec_ctx); 1307 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, 1308 "pending_batches_resume"); 1309 pending_batch_clear(calld, pending); 1310 } 1311 } 1312 // Note: This will release the call combiner. 1313 closures.RunClosures(calld->call_combiner); 1314 } 1315 1316 static void maybe_clear_pending_batch(grpc_call_element* elem, 1317 pending_batch* pending) { 1318 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1319 call_data* calld = static_cast<call_data*>(elem->call_data); 1320 grpc_transport_stream_op_batch* batch = pending->batch; 1321 // We clear the pending batch if all of its callbacks have been 1322 // scheduled and reset to nullptr. 1323 if (batch->on_complete == nullptr && 1324 (!batch->recv_initial_metadata || 1325 batch->payload->recv_initial_metadata.recv_initial_metadata_ready == 1326 nullptr) && 1327 (!batch->recv_message || 1328 batch->payload->recv_message.recv_message_ready == nullptr) && 1329 (!batch->recv_trailing_metadata || 1330 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == 1331 nullptr)) { 1332 if (grpc_client_channel_trace.enabled()) { 1333 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, 1334 calld); 1335 } 1336 pending_batch_clear(calld, pending); 1337 } 1338 } 1339 1340 // Returns a pointer to the first pending batch for which predicate(batch) 1341 // returns true, or null if not found. 1342 template <typename Predicate> 1343 static pending_batch* pending_batch_find(grpc_call_element* elem, 1344 const char* log_message, 1345 Predicate predicate) { 1346 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1347 call_data* calld = static_cast<call_data*>(elem->call_data); 1348 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 1349 pending_batch* pending = &calld->pending_batches[i]; 1350 grpc_transport_stream_op_batch* batch = pending->batch; 1351 if (batch != nullptr && predicate(batch)) { 1352 if (grpc_client_channel_trace.enabled()) { 1353 gpr_log(GPR_INFO, 1354 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, 1355 calld, log_message, i); 1356 } 1357 return pending; 1358 } 1359 } 1360 return nullptr; 1361 } 1362 1363 // 1364 // retry code 1365 // 1366 1367 // Commits the call so that no further retry attempts will be performed. 1368 static void retry_commit(grpc_call_element* elem, 1369 subchannel_call_retry_state* retry_state) { 1370 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1371 call_data* calld = static_cast<call_data*>(elem->call_data); 1372 if (calld->retry_committed) return; 1373 calld->retry_committed = true; 1374 if (grpc_client_channel_trace.enabled()) { 1375 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld); 1376 } 1377 if (retry_state != nullptr) { 1378 free_cached_send_op_data_after_commit(elem, retry_state); 1379 } 1380 } 1381 1382 // Starts a retry after appropriate back-off. 1383 static void do_retry(grpc_call_element* elem, 1384 subchannel_call_retry_state* retry_state, 1385 grpc_millis server_pushback_ms) { 1386 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1387 call_data* calld = static_cast<call_data*>(elem->call_data); 1388 GPR_ASSERT(calld->method_params != nullptr); 1389 const ClientChannelMethodParams::RetryPolicy* retry_policy = 1390 calld->method_params->retry_policy(); 1391 GPR_ASSERT(retry_policy != nullptr); 1392 // Reset subchannel call and connected subchannel. 1393 if (calld->subchannel_call != nullptr) { 1394 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, 1395 "client_channel_call_retry"); 1396 calld->subchannel_call = nullptr; 1397 } 1398 if (calld->pick.connected_subchannel != nullptr) { 1399 calld->pick.connected_subchannel.reset(); 1400 } 1401 // Compute backoff delay. 1402 grpc_millis next_attempt_time; 1403 if (server_pushback_ms >= 0) { 1404 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms; 1405 calld->last_attempt_got_server_pushback = true; 1406 } else { 1407 if (calld->num_attempts_completed == 1 || 1408 calld->last_attempt_got_server_pushback) { 1409 calld->retry_backoff.Init( 1410 grpc_core::BackOff::Options() 1411 .set_initial_backoff(retry_policy->initial_backoff) 1412 .set_multiplier(retry_policy->backoff_multiplier) 1413 .set_jitter(RETRY_BACKOFF_JITTER) 1414 .set_max_backoff(retry_policy->max_backoff)); 1415 calld->last_attempt_got_server_pushback = false; 1416 } 1417 next_attempt_time = calld->retry_backoff->NextAttemptTime(); 1418 } 1419 if (grpc_client_channel_trace.enabled()) { 1420 gpr_log(GPR_INFO, 1421 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand, 1422 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now()); 1423 } 1424 // Schedule retry after computed delay. 1425 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem, 1426 grpc_combiner_scheduler(chand->combiner)); 1427 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure); 1428 // Update bookkeeping. 1429 if (retry_state != nullptr) retry_state->retry_dispatched = true; 1430 } 1431 1432 // Returns true if the call is being retried. 1433 static bool maybe_retry(grpc_call_element* elem, 1434 subchannel_batch_data* batch_data, 1435 grpc_status_code status, 1436 grpc_mdelem* server_pushback_md) { 1437 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1438 call_data* calld = static_cast<call_data*>(elem->call_data); 1439 // Get retry policy. 1440 if (calld->method_params == nullptr) return false; 1441 const ClientChannelMethodParams::RetryPolicy* retry_policy = 1442 calld->method_params->retry_policy(); 1443 if (retry_policy == nullptr) return false; 1444 // If we've already dispatched a retry from this call, return true. 1445 // This catches the case where the batch has multiple callbacks 1446 // (i.e., it includes either recv_message or recv_initial_metadata). 1447 subchannel_call_retry_state* retry_state = nullptr; 1448 if (batch_data != nullptr) { 1449 retry_state = static_cast<subchannel_call_retry_state*>( 1450 grpc_connected_subchannel_call_get_parent_data( 1451 batch_data->subchannel_call)); 1452 if (retry_state->retry_dispatched) { 1453 if (grpc_client_channel_trace.enabled()) { 1454 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, 1455 calld); 1456 } 1457 return true; 1458 } 1459 } 1460 // Check status. 1461 if (GPR_LIKELY(status == GRPC_STATUS_OK)) { 1462 if (calld->retry_throttle_data != nullptr) { 1463 calld->retry_throttle_data->RecordSuccess(); 1464 } 1465 if (grpc_client_channel_trace.enabled()) { 1466 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld); 1467 } 1468 return false; 1469 } 1470 // Status is not OK. Check whether the status is retryable. 1471 if (!retry_policy->retryable_status_codes.Contains(status)) { 1472 if (grpc_client_channel_trace.enabled()) { 1473 gpr_log(GPR_INFO, 1474 "chand=%p calld=%p: status %s not configured as retryable", chand, 1475 calld, grpc_status_code_to_string(status)); 1476 } 1477 return false; 1478 } 1479 // Record the failure and check whether retries are throttled. 1480 // Note that it's important for this check to come after the status 1481 // code check above, since we should only record failures whose statuses 1482 // match the configured retryable status codes, so that we don't count 1483 // things like failures due to malformed requests (INVALID_ARGUMENT). 1484 // Conversely, it's important for this to come before the remaining 1485 // checks, so that we don't fail to record failures due to other factors. 1486 if (calld->retry_throttle_data != nullptr && 1487 !calld->retry_throttle_data->RecordFailure()) { 1488 if (grpc_client_channel_trace.enabled()) { 1489 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld); 1490 } 1491 return false; 1492 } 1493 // Check whether the call is committed. 1494 if (calld->retry_committed) { 1495 if (grpc_client_channel_trace.enabled()) { 1496 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand, 1497 calld); 1498 } 1499 return false; 1500 } 1501 // Check whether we have retries remaining. 1502 ++calld->num_attempts_completed; 1503 if (calld->num_attempts_completed >= retry_policy->max_attempts) { 1504 if (grpc_client_channel_trace.enabled()) { 1505 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand, 1506 calld, retry_policy->max_attempts); 1507 } 1508 return false; 1509 } 1510 // If the call was cancelled from the surface, don't retry. 1511 if (calld->cancel_error != GRPC_ERROR_NONE) { 1512 if (grpc_client_channel_trace.enabled()) { 1513 gpr_log(GPR_INFO, 1514 "chand=%p calld=%p: call cancelled from surface, not retrying", 1515 chand, calld); 1516 } 1517 return false; 1518 } 1519 // Check server push-back. 1520 grpc_millis server_pushback_ms = -1; 1521 if (server_pushback_md != nullptr) { 1522 // If the value is "-1" or any other unparseable string, we do not retry. 1523 uint32_t ms; 1524 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { 1525 if (grpc_client_channel_trace.enabled()) { 1526 gpr_log(GPR_INFO, 1527 "chand=%p calld=%p: not retrying due to server push-back", 1528 chand, calld); 1529 } 1530 return false; 1531 } else { 1532 if (grpc_client_channel_trace.enabled()) { 1533 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", 1534 chand, calld, ms); 1535 } 1536 server_pushback_ms = (grpc_millis)ms; 1537 } 1538 } 1539 do_retry(elem, retry_state, server_pushback_ms); 1540 return true; 1541 } 1542 1543 // 1544 // subchannel_batch_data 1545 // 1546 1547 // Creates a subchannel_batch_data object on the call's arena with the 1548 // specified refcount. If set_on_complete is true, the batch's 1549 // on_complete callback will be set to point to on_complete(); 1550 // otherwise, the batch's on_complete callback will be null. 1551 static subchannel_batch_data* batch_data_create(grpc_call_element* elem, 1552 int refcount, 1553 bool set_on_complete) { 1554 call_data* calld = static_cast<call_data*>(elem->call_data); 1555 subchannel_call_retry_state* retry_state = 1556 static_cast<subchannel_call_retry_state*>( 1557 grpc_connected_subchannel_call_get_parent_data( 1558 calld->subchannel_call)); 1559 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>( 1560 gpr_arena_alloc(calld->arena, sizeof(*batch_data))); 1561 batch_data->elem = elem; 1562 batch_data->subchannel_call = 1563 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); 1564 batch_data->batch.payload = &retry_state->batch_payload; 1565 gpr_ref_init(&batch_data->refs, refcount); 1566 if (set_on_complete) { 1567 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, 1568 grpc_schedule_on_exec_ctx); 1569 batch_data->batch.on_complete = &batch_data->on_complete; 1570 } 1571 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); 1572 return batch_data; 1573 } 1574 1575 static void batch_data_unref(subchannel_batch_data* batch_data) { 1576 if (gpr_unref(&batch_data->refs)) { 1577 subchannel_call_retry_state* retry_state = 1578 static_cast<subchannel_call_retry_state*>( 1579 grpc_connected_subchannel_call_get_parent_data( 1580 batch_data->subchannel_call)); 1581 if (batch_data->batch.send_initial_metadata) { 1582 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); 1583 } 1584 if (batch_data->batch.send_trailing_metadata) { 1585 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata); 1586 } 1587 if (batch_data->batch.recv_initial_metadata) { 1588 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata); 1589 } 1590 if (batch_data->batch.recv_trailing_metadata) { 1591 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); 1592 } 1593 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref"); 1594 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data); 1595 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); 1596 } 1597 } 1598 1599 // 1600 // recv_initial_metadata callback handling 1601 // 1602 1603 // Invokes recv_initial_metadata_ready for a subchannel batch. 1604 static void invoke_recv_initial_metadata_callback(void* arg, 1605 grpc_error* error) { 1606 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); 1607 // Find pending batch. 1608 pending_batch* pending = pending_batch_find( 1609 batch_data->elem, "invoking recv_initial_metadata_ready for", 1610 [](grpc_transport_stream_op_batch* batch) { 1611 return batch->recv_initial_metadata && 1612 batch->payload->recv_initial_metadata 1613 .recv_initial_metadata_ready != nullptr; 1614 }); 1615 GPR_ASSERT(pending != nullptr); 1616 // Return metadata. 1617 subchannel_call_retry_state* retry_state = 1618 static_cast<subchannel_call_retry_state*>( 1619 grpc_connected_subchannel_call_get_parent_data( 1620 batch_data->subchannel_call)); 1621 grpc_metadata_batch_move( 1622 &retry_state->recv_initial_metadata, 1623 pending->batch->payload->recv_initial_metadata.recv_initial_metadata); 1624 // Update bookkeeping. 1625 // Note: Need to do this before invoking the callback, since invoking 1626 // the callback will result in yielding the call combiner. 1627 grpc_closure* recv_initial_metadata_ready = 1628 pending->batch->payload->recv_initial_metadata 1629 .recv_initial_metadata_ready; 1630 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready = 1631 nullptr; 1632 maybe_clear_pending_batch(batch_data->elem, pending); 1633 batch_data_unref(batch_data); 1634 // Invoke callback. 1635 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); 1636 } 1637 1638 // Intercepts recv_initial_metadata_ready callback for retries. 1639 // Commits the call and returns the initial metadata up the stack. 1640 static void recv_initial_metadata_ready(void* arg, grpc_error* error) { 1641 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); 1642 grpc_call_element* elem = batch_data->elem; 1643 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1644 call_data* calld = static_cast<call_data*>(elem->call_data); 1645 if (grpc_client_channel_trace.enabled()) { 1646 gpr_log(GPR_INFO, 1647 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", 1648 chand, calld, grpc_error_string(error)); 1649 } 1650 subchannel_call_retry_state* retry_state = 1651 static_cast<subchannel_call_retry_state*>( 1652 grpc_connected_subchannel_call_get_parent_data( 1653 batch_data->subchannel_call)); 1654 retry_state->completed_recv_initial_metadata = true; 1655 // If a retry was already dispatched, then we're not going to use the 1656 // result of this recv_initial_metadata op, so do nothing. 1657 if (retry_state->retry_dispatched) { 1658 GRPC_CALL_COMBINER_STOP( 1659 calld->call_combiner, 1660 "recv_initial_metadata_ready after retry dispatched"); 1661 return; 1662 } 1663 // If we got an error or a Trailers-Only response and have not yet gotten 1664 // the recv_trailing_metadata_ready callback, then defer propagating this 1665 // callback back to the surface. We can evaluate whether to retry when 1666 // recv_trailing_metadata comes back. 1667 if (GPR_UNLIKELY((retry_state->trailing_metadata_available || 1668 error != GRPC_ERROR_NONE) && 1669 !retry_state->completed_recv_trailing_metadata)) { 1670 if (grpc_client_channel_trace.enabled()) { 1671 gpr_log(GPR_INFO, 1672 "chand=%p calld=%p: deferring recv_initial_metadata_ready " 1673 "(Trailers-Only)", 1674 chand, calld); 1675 } 1676 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data; 1677 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error); 1678 if (!retry_state->started_recv_trailing_metadata) { 1679 // recv_trailing_metadata not yet started by application; start it 1680 // ourselves to get status. 1681 start_internal_recv_trailing_metadata(elem); 1682 } else { 1683 GRPC_CALL_COMBINER_STOP( 1684 calld->call_combiner, 1685 "recv_initial_metadata_ready trailers-only or error"); 1686 } 1687 return; 1688 } 1689 // Received valid initial metadata, so commit the call. 1690 retry_commit(elem, retry_state); 1691 // Invoke the callback to return the result to the surface. 1692 // Manually invoking a callback function; it does not take ownership of error. 1693 invoke_recv_initial_metadata_callback(batch_data, error); 1694 } 1695 1696 // 1697 // recv_message callback handling 1698 // 1699 1700 // Invokes recv_message_ready for a subchannel batch. 1701 static void invoke_recv_message_callback(void* arg, grpc_error* error) { 1702 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); 1703 // Find pending op. 1704 pending_batch* pending = pending_batch_find( 1705 batch_data->elem, "invoking recv_message_ready for", 1706 [](grpc_transport_stream_op_batch* batch) { 1707 return batch->recv_message && 1708 batch->payload->recv_message.recv_message_ready != nullptr; 1709 }); 1710 GPR_ASSERT(pending != nullptr); 1711 // Return payload. 1712 subchannel_call_retry_state* retry_state = 1713 static_cast<subchannel_call_retry_state*>( 1714 grpc_connected_subchannel_call_get_parent_data( 1715 batch_data->subchannel_call)); 1716 *pending->batch->payload->recv_message.recv_message = 1717 std::move(retry_state->recv_message); 1718 // Update bookkeeping. 1719 // Note: Need to do this before invoking the callback, since invoking 1720 // the callback will result in yielding the call combiner. 1721 grpc_closure* recv_message_ready = 1722 pending->batch->payload->recv_message.recv_message_ready; 1723 pending->batch->payload->recv_message.recv_message_ready = nullptr; 1724 maybe_clear_pending_batch(batch_data->elem, pending); 1725 batch_data_unref(batch_data); 1726 // Invoke callback. 1727 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); 1728 } 1729 1730 // Intercepts recv_message_ready callback for retries. 1731 // Commits the call and returns the message up the stack. 1732 static void recv_message_ready(void* arg, grpc_error* error) { 1733 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); 1734 grpc_call_element* elem = batch_data->elem; 1735 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1736 call_data* calld = static_cast<call_data*>(elem->call_data); 1737 if (grpc_client_channel_trace.enabled()) { 1738 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", 1739 chand, calld, grpc_error_string(error)); 1740 } 1741 subchannel_call_retry_state* retry_state = 1742 static_cast<subchannel_call_retry_state*>( 1743 grpc_connected_subchannel_call_get_parent_data( 1744 batch_data->subchannel_call)); 1745 ++retry_state->completed_recv_message_count; 1746 // If a retry was already dispatched, then we're not going to use the 1747 // result of this recv_message op, so do nothing. 1748 if (retry_state->retry_dispatched) { 1749 GRPC_CALL_COMBINER_STOP(calld->call_combiner, 1750 "recv_message_ready after retry dispatched"); 1751 return; 1752 } 1753 // If we got an error or the payload was nullptr and we have not yet gotten 1754 // the recv_trailing_metadata_ready callback, then defer propagating this 1755 // callback back to the surface. We can evaluate whether to retry when 1756 // recv_trailing_metadata comes back. 1757 if (GPR_UNLIKELY( 1758 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) && 1759 !retry_state->completed_recv_trailing_metadata)) { 1760 if (grpc_client_channel_trace.enabled()) { 1761 gpr_log(GPR_INFO, 1762 "chand=%p calld=%p: deferring recv_message_ready (nullptr " 1763 "message and recv_trailing_metadata pending)", 1764 chand, calld); 1765 } 1766 retry_state->recv_message_ready_deferred_batch = batch_data; 1767 retry_state->recv_message_error = GRPC_ERROR_REF(error); 1768 if (!retry_state->started_recv_trailing_metadata) { 1769 // recv_trailing_metadata not yet started by application; start it 1770 // ourselves to get status. 1771 start_internal_recv_trailing_metadata(elem); 1772 } else { 1773 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null"); 1774 } 1775 return; 1776 } 1777 // Received a valid message, so commit the call. 1778 retry_commit(elem, retry_state); 1779 // Invoke the callback to return the result to the surface. 1780 // Manually invoking a callback function; it does not take ownership of error. 1781 invoke_recv_message_callback(batch_data, error); 1782 } 1783 1784 // 1785 // recv_trailing_metadata handling 1786 // 1787 1788 // Sets *status and *server_pushback_md based on md_batch and error. 1789 // Only sets *server_pushback_md if server_pushback_md != nullptr. 1790 static void get_call_status(grpc_call_element* elem, 1791 grpc_metadata_batch* md_batch, grpc_error* error, 1792 grpc_status_code* status, 1793 grpc_mdelem** server_pushback_md) { 1794 call_data* calld = static_cast<call_data*>(elem->call_data); 1795 if (error != GRPC_ERROR_NONE) { 1796 grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, 1797 nullptr); 1798 } else { 1799 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); 1800 *status = 1801 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); 1802 if (server_pushback_md != nullptr && 1803 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { 1804 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; 1805 } 1806 } 1807 GRPC_ERROR_UNREF(error); 1808 } 1809 1810 // Adds recv_trailing_metadata_ready closure to closures. 1811 static void add_closure_for_recv_trailing_metadata_ready( 1812 grpc_call_element* elem, subchannel_batch_data* batch_data, 1813 grpc_error* error, grpc_core::CallCombinerClosureList* closures) { 1814 // Find pending batch. 1815 pending_batch* pending = pending_batch_find( 1816 elem, "invoking recv_trailing_metadata for", 1817 [](grpc_transport_stream_op_batch* batch) { 1818 return batch->recv_trailing_metadata && 1819 batch->payload->recv_trailing_metadata 1820 .recv_trailing_metadata_ready != nullptr; 1821 }); 1822 // If we generated the recv_trailing_metadata op internally via 1823 // start_internal_recv_trailing_metadata(), then there will be no 1824 // pending batch. 1825 if (pending == nullptr) { 1826 GRPC_ERROR_UNREF(error); 1827 return; 1828 } 1829 // Return metadata. 1830 subchannel_call_retry_state* retry_state = 1831 static_cast<subchannel_call_retry_state*>( 1832 grpc_connected_subchannel_call_get_parent_data( 1833 batch_data->subchannel_call)); 1834 grpc_metadata_batch_move( 1835 &retry_state->recv_trailing_metadata, 1836 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); 1837 // Add closure. 1838 closures->Add(pending->batch->payload->recv_trailing_metadata 1839 .recv_trailing_metadata_ready, 1840 error, "recv_trailing_metadata_ready for pending batch"); 1841 // Update bookkeeping. 1842 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = 1843 nullptr; 1844 maybe_clear_pending_batch(elem, pending); 1845 } 1846 1847 // Adds any necessary closures for deferred recv_initial_metadata and 1848 // recv_message callbacks to closures. 1849 static void add_closures_for_deferred_recv_callbacks( 1850 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, 1851 grpc_core::CallCombinerClosureList* closures) { 1852 if (batch_data->batch.recv_trailing_metadata) { 1853 // Add closure for deferred recv_initial_metadata_ready. 1854 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != 1855 nullptr)) { 1856 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, 1857 invoke_recv_initial_metadata_callback, 1858 retry_state->recv_initial_metadata_ready_deferred_batch, 1859 grpc_schedule_on_exec_ctx); 1860 closures->Add(&retry_state->recv_initial_metadata_ready, 1861 retry_state->recv_initial_metadata_error, 1862 "resuming recv_initial_metadata_ready"); 1863 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; 1864 } 1865 // Add closure for deferred recv_message_ready. 1866 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != 1867 nullptr)) { 1868 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, 1869 invoke_recv_message_callback, 1870 retry_state->recv_message_ready_deferred_batch, 1871 grpc_schedule_on_exec_ctx); 1872 closures->Add(&retry_state->recv_message_ready, 1873 retry_state->recv_message_error, 1874 "resuming recv_message_ready"); 1875 retry_state->recv_message_ready_deferred_batch = nullptr; 1876 } 1877 } 1878 } 1879 1880 // Returns true if any op in the batch was not yet started. 1881 // Only looks at send ops, since recv ops are always started immediately. 1882 static bool pending_batch_is_unstarted( 1883 pending_batch* pending, call_data* calld, 1884 subchannel_call_retry_state* retry_state) { 1885 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { 1886 return false; 1887 } 1888 if (pending->batch->send_initial_metadata && 1889 !retry_state->started_send_initial_metadata) { 1890 return true; 1891 } 1892 if (pending->batch->send_message && 1893 retry_state->started_send_message_count < calld->send_messages->size()) { 1894 return true; 1895 } 1896 if (pending->batch->send_trailing_metadata && 1897 !retry_state->started_send_trailing_metadata) { 1898 return true; 1899 } 1900 return false; 1901 } 1902 1903 // For any pending batch containing an op that has not yet been started, 1904 // adds the pending batch's completion closures to closures. 1905 static void add_closures_to_fail_unstarted_pending_batches( 1906 grpc_call_element* elem, subchannel_call_retry_state* retry_state, 1907 grpc_error* error, grpc_core::CallCombinerClosureList* closures) { 1908 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1909 call_data* calld = static_cast<call_data*>(elem->call_data); 1910 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 1911 pending_batch* pending = &calld->pending_batches[i]; 1912 if (pending_batch_is_unstarted(pending, calld, retry_state)) { 1913 if (grpc_client_channel_trace.enabled()) { 1914 gpr_log(GPR_INFO, 1915 "chand=%p calld=%p: failing unstarted pending batch at index " 1916 "%" PRIuPTR, 1917 chand, calld, i); 1918 } 1919 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), 1920 "failing on_complete for pending batch"); 1921 pending->batch->on_complete = nullptr; 1922 maybe_clear_pending_batch(elem, pending); 1923 } 1924 } 1925 GRPC_ERROR_UNREF(error); 1926 } 1927 1928 // Runs necessary closures upon completion of a call attempt. 1929 static void run_closures_for_completed_call(subchannel_batch_data* batch_data, 1930 grpc_error* error) { 1931 grpc_call_element* elem = batch_data->elem; 1932 call_data* calld = static_cast<call_data*>(elem->call_data); 1933 subchannel_call_retry_state* retry_state = 1934 static_cast<subchannel_call_retry_state*>( 1935 grpc_connected_subchannel_call_get_parent_data( 1936 batch_data->subchannel_call)); 1937 // Construct list of closures to execute. 1938 grpc_core::CallCombinerClosureList closures; 1939 // First, add closure for recv_trailing_metadata_ready. 1940 add_closure_for_recv_trailing_metadata_ready( 1941 elem, batch_data, GRPC_ERROR_REF(error), &closures); 1942 // If there are deferred recv_initial_metadata_ready or recv_message_ready 1943 // callbacks, add them to closures. 1944 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); 1945 // Add closures to fail any pending batches that have not yet been started. 1946 add_closures_to_fail_unstarted_pending_batches( 1947 elem, retry_state, GRPC_ERROR_REF(error), &closures); 1948 // Don't need batch_data anymore. 1949 batch_data_unref(batch_data); 1950 // Schedule all of the closures identified above. 1951 // Note: This will release the call combiner. 1952 closures.RunClosures(calld->call_combiner); 1953 GRPC_ERROR_UNREF(error); 1954 } 1955 1956 // Intercepts recv_trailing_metadata_ready callback for retries. 1957 // Commits the call and returns the trailing metadata up the stack. 1958 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { 1959 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); 1960 grpc_call_element* elem = batch_data->elem; 1961 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 1962 call_data* calld = static_cast<call_data*>(elem->call_data); 1963 if (grpc_client_channel_trace.enabled()) { 1964 gpr_log(GPR_INFO, 1965 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", 1966 chand, calld, grpc_error_string(error)); 1967 } 1968 subchannel_call_retry_state* retry_state = 1969 static_cast<subchannel_call_retry_state*>( 1970 grpc_connected_subchannel_call_get_parent_data( 1971 batch_data->subchannel_call)); 1972 retry_state->completed_recv_trailing_metadata = true; 1973 // Get the call's status and check for server pushback metadata. 1974 grpc_status_code status = GRPC_STATUS_OK; 1975 grpc_mdelem* server_pushback_md = nullptr; 1976 grpc_metadata_batch* md_batch = 1977 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata; 1978 get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, 1979 &server_pushback_md); 1980 grpc_core::channelz::SubchannelNode* channelz_subchannel = 1981 calld->pick.connected_subchannel->channelz_subchannel(); 1982 if (channelz_subchannel != nullptr) { 1983 if (status == GRPC_STATUS_OK) { 1984 channelz_subchannel->RecordCallSucceeded(); 1985 } else { 1986 channelz_subchannel->RecordCallFailed(); 1987 } 1988 } 1989 if (grpc_client_channel_trace.enabled()) { 1990 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, 1991 calld, grpc_status_code_to_string(status)); 1992 } 1993 // Check if we should retry. 1994 if (maybe_retry(elem, batch_data, status, server_pushback_md)) { 1995 // Unref batch_data for deferred recv_initial_metadata_ready or 1996 // recv_message_ready callbacks, if any. 1997 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { 1998 batch_data_unref(batch_data); 1999 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); 2000 } 2001 if (retry_state->recv_message_ready_deferred_batch != nullptr) { 2002 batch_data_unref(batch_data); 2003 GRPC_ERROR_UNREF(retry_state->recv_message_error); 2004 } 2005 batch_data_unref(batch_data); 2006 return; 2007 } 2008 // Not retrying, so commit the call. 2009 retry_commit(elem, retry_state); 2010 // Run any necessary closures. 2011 run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); 2012 } 2013 2014 // 2015 // on_complete callback handling 2016 // 2017 2018 // Adds the on_complete closure for the pending batch completed in 2019 // batch_data to closures. 2020 static void add_closure_for_completed_pending_batch( 2021 grpc_call_element* elem, subchannel_batch_data* batch_data, 2022 subchannel_call_retry_state* retry_state, grpc_error* error, 2023 grpc_core::CallCombinerClosureList* closures) { 2024 pending_batch* pending = pending_batch_find( 2025 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { 2026 // Match the pending batch with the same set of send ops as the 2027 // subchannel batch we've just completed. 2028 return batch->on_complete != nullptr && 2029 batch_data->batch.send_initial_metadata == 2030 batch->send_initial_metadata && 2031 batch_data->batch.send_message == batch->send_message && 2032 batch_data->batch.send_trailing_metadata == 2033 batch->send_trailing_metadata; 2034 }); 2035 // If batch_data is a replay batch, then there will be no pending 2036 // batch to complete. 2037 if (pending == nullptr) { 2038 GRPC_ERROR_UNREF(error); 2039 return; 2040 } 2041 // Add closure. 2042 closures->Add(pending->batch->on_complete, error, 2043 "on_complete for pending batch"); 2044 pending->batch->on_complete = nullptr; 2045 maybe_clear_pending_batch(elem, pending); 2046 } 2047 2048 // If there are any cached ops to replay or pending ops to start on the 2049 // subchannel call, adds a closure to closures to invoke 2050 // start_retriable_subchannel_batches(). 2051 static void add_closures_for_replay_or_pending_send_ops( 2052 grpc_call_element* elem, subchannel_batch_data* batch_data, 2053 subchannel_call_retry_state* retry_state, 2054 grpc_core::CallCombinerClosureList* closures) { 2055 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2056 call_data* calld = static_cast<call_data*>(elem->call_data); 2057 bool have_pending_send_message_ops = 2058 retry_state->started_send_message_count < calld->send_messages->size(); 2059 bool have_pending_send_trailing_metadata_op = 2060 calld->seen_send_trailing_metadata && 2061 !retry_state->started_send_trailing_metadata; 2062 if (!have_pending_send_message_ops && 2063 !have_pending_send_trailing_metadata_op) { 2064 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 2065 pending_batch* pending = &calld->pending_batches[i]; 2066 grpc_transport_stream_op_batch* batch = pending->batch; 2067 if (batch == nullptr || pending->send_ops_cached) continue; 2068 if (batch->send_message) have_pending_send_message_ops = true; 2069 if (batch->send_trailing_metadata) { 2070 have_pending_send_trailing_metadata_op = true; 2071 } 2072 } 2073 } 2074 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { 2075 if (grpc_client_channel_trace.enabled()) { 2076 gpr_log(GPR_INFO, 2077 "chand=%p calld=%p: starting next batch for pending send op(s)", 2078 chand, calld); 2079 } 2080 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, 2081 start_retriable_subchannel_batches, elem, 2082 grpc_schedule_on_exec_ctx); 2083 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, 2084 "starting next batch for send_* op(s)"); 2085 } 2086 } 2087 2088 // Callback used to intercept on_complete from subchannel calls. 2089 // Called only when retries are enabled. 2090 static void on_complete(void* arg, grpc_error* error) { 2091 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg); 2092 grpc_call_element* elem = batch_data->elem; 2093 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2094 call_data* calld = static_cast<call_data*>(elem->call_data); 2095 if (grpc_client_channel_trace.enabled()) { 2096 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch); 2097 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", 2098 chand, calld, grpc_error_string(error), batch_str); 2099 gpr_free(batch_str); 2100 } 2101 subchannel_call_retry_state* retry_state = 2102 static_cast<subchannel_call_retry_state*>( 2103 grpc_connected_subchannel_call_get_parent_data( 2104 batch_data->subchannel_call)); 2105 // Update bookkeeping in retry_state. 2106 if (batch_data->batch.send_initial_metadata) { 2107 retry_state->completed_send_initial_metadata = true; 2108 } 2109 if (batch_data->batch.send_message) { 2110 ++retry_state->completed_send_message_count; 2111 } 2112 if (batch_data->batch.send_trailing_metadata) { 2113 retry_state->completed_send_trailing_metadata = true; 2114 } 2115 // If the call is committed, free cached data for send ops that we've just 2116 // completed. 2117 if (calld->retry_committed) { 2118 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); 2119 } 2120 // Construct list of closures to execute. 2121 grpc_core::CallCombinerClosureList closures; 2122 // If a retry was already dispatched, that means we saw 2123 // recv_trailing_metadata before this, so we do nothing here. 2124 // Otherwise, invoke the callback to return the result to the surface. 2125 if (!retry_state->retry_dispatched) { 2126 // Add closure for the completed pending batch, if any. 2127 add_closure_for_completed_pending_batch(elem, batch_data, retry_state, 2128 GRPC_ERROR_REF(error), &closures); 2129 // If needed, add a callback to start any replay or pending send ops on 2130 // the subchannel call. 2131 if (!retry_state->completed_recv_trailing_metadata) { 2132 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, 2133 &closures); 2134 } 2135 } 2136 // Track number of pending subchannel send batches and determine if this 2137 // was the last one. 2138 --calld->num_pending_retriable_subchannel_send_batches; 2139 const bool last_send_batch_complete = 2140 calld->num_pending_retriable_subchannel_send_batches == 0; 2141 // Don't need batch_data anymore. 2142 batch_data_unref(batch_data); 2143 // Schedule all of the closures identified above. 2144 // Note: This yeilds the call combiner. 2145 closures.RunClosures(calld->call_combiner); 2146 // If this was the last subchannel send batch, unref the call stack. 2147 if (last_send_batch_complete) { 2148 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); 2149 } 2150 } 2151 2152 // 2153 // subchannel batch construction 2154 // 2155 2156 // Helper function used to start a subchannel batch in the call combiner. 2157 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { 2158 grpc_transport_stream_op_batch* batch = 2159 static_cast<grpc_transport_stream_op_batch*>(arg); 2160 grpc_subchannel_call* subchannel_call = 2161 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); 2162 // Note: This will release the call combiner. 2163 grpc_subchannel_call_process_op(subchannel_call, batch); 2164 } 2165 2166 // Adds a closure to closures that will execute batch in the call combiner. 2167 static void add_closure_for_subchannel_batch( 2168 grpc_call_element* elem, grpc_transport_stream_op_batch* batch, 2169 grpc_core::CallCombinerClosureList* closures) { 2170 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2171 call_data* calld = static_cast<call_data*>(elem->call_data); 2172 batch->handler_private.extra_arg = calld->subchannel_call; 2173 GRPC_CLOSURE_INIT(&batch->handler_private.closure, 2174 start_batch_in_call_combiner, batch, 2175 grpc_schedule_on_exec_ctx); 2176 if (grpc_client_channel_trace.enabled()) { 2177 char* batch_str = grpc_transport_stream_op_batch_string(batch); 2178 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, 2179 calld, batch_str); 2180 gpr_free(batch_str); 2181 } 2182 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, 2183 "start_subchannel_batch"); 2184 } 2185 2186 // Adds retriable send_initial_metadata op to batch_data. 2187 static void add_retriable_send_initial_metadata_op( 2188 call_data* calld, subchannel_call_retry_state* retry_state, 2189 subchannel_batch_data* batch_data) { 2190 // Maps the number of retries to the corresponding metadata value slice. 2191 static const grpc_slice* retry_count_strings[] = { 2192 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4}; 2193 // We need to make a copy of the metadata batch for each attempt, since 2194 // the filters in the subchannel stack may modify this batch, and we don't 2195 // want those modifications to be passed forward to subsequent attempts. 2196 // 2197 // If we've already completed one or more attempts, add the 2198 // grpc-retry-attempts header. 2199 retry_state->send_initial_metadata_storage = 2200 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( 2201 calld->arena, sizeof(grpc_linked_mdelem) * 2202 (calld->send_initial_metadata.list.count + 2203 (calld->num_attempts_completed > 0)))); 2204 grpc_metadata_batch_copy(&calld->send_initial_metadata, 2205 &retry_state->send_initial_metadata, 2206 retry_state->send_initial_metadata_storage); 2207 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named 2208 .grpc_previous_rpc_attempts != nullptr)) { 2209 grpc_metadata_batch_remove(&retry_state->send_initial_metadata, 2210 retry_state->send_initial_metadata.idx.named 2211 .grpc_previous_rpc_attempts); 2212 } 2213 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { 2214 grpc_mdelem retry_md = grpc_mdelem_from_slices( 2215 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, 2216 *retry_count_strings[calld->num_attempts_completed - 1]); 2217 grpc_error* error = grpc_metadata_batch_add_tail( 2218 &retry_state->send_initial_metadata, 2219 &retry_state->send_initial_metadata_storage[calld->send_initial_metadata 2220 .list.count], 2221 retry_md); 2222 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { 2223 gpr_log(GPR_ERROR, "error adding retry metadata: %s", 2224 grpc_error_string(error)); 2225 GPR_ASSERT(false); 2226 } 2227 } 2228 retry_state->started_send_initial_metadata = true; 2229 batch_data->batch.send_initial_metadata = true; 2230 batch_data->batch.payload->send_initial_metadata.send_initial_metadata = 2231 &retry_state->send_initial_metadata; 2232 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags = 2233 calld->send_initial_metadata_flags; 2234 batch_data->batch.payload->send_initial_metadata.peer_string = 2235 calld->peer_string; 2236 } 2237 2238 // Adds retriable send_message op to batch_data. 2239 static void add_retriable_send_message_op( 2240 grpc_call_element* elem, subchannel_call_retry_state* retry_state, 2241 subchannel_batch_data* batch_data) { 2242 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2243 call_data* calld = static_cast<call_data*>(elem->call_data); 2244 if (grpc_client_channel_trace.enabled()) { 2245 gpr_log(GPR_INFO, 2246 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", 2247 chand, calld, retry_state->started_send_message_count); 2248 } 2249 grpc_core::ByteStreamCache* cache = 2250 (*calld->send_messages)[retry_state->started_send_message_count]; 2251 ++retry_state->started_send_message_count; 2252 retry_state->send_message.Init(cache); 2253 batch_data->batch.send_message = true; 2254 batch_data->batch.payload->send_message.send_message.reset( 2255 retry_state->send_message.get()); 2256 } 2257 2258 // Adds retriable send_trailing_metadata op to batch_data. 2259 static void add_retriable_send_trailing_metadata_op( 2260 call_data* calld, subchannel_call_retry_state* retry_state, 2261 subchannel_batch_data* batch_data) { 2262 // We need to make a copy of the metadata batch for each attempt, since 2263 // the filters in the subchannel stack may modify this batch, and we don't 2264 // want those modifications to be passed forward to subsequent attempts. 2265 retry_state->send_trailing_metadata_storage = 2266 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc( 2267 calld->arena, sizeof(grpc_linked_mdelem) * 2268 calld->send_trailing_metadata.list.count)); 2269 grpc_metadata_batch_copy(&calld->send_trailing_metadata, 2270 &retry_state->send_trailing_metadata, 2271 retry_state->send_trailing_metadata_storage); 2272 retry_state->started_send_trailing_metadata = true; 2273 batch_data->batch.send_trailing_metadata = true; 2274 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata = 2275 &retry_state->send_trailing_metadata; 2276 } 2277 2278 // Adds retriable recv_initial_metadata op to batch_data. 2279 static void add_retriable_recv_initial_metadata_op( 2280 call_data* calld, subchannel_call_retry_state* retry_state, 2281 subchannel_batch_data* batch_data) { 2282 retry_state->started_recv_initial_metadata = true; 2283 batch_data->batch.recv_initial_metadata = true; 2284 grpc_metadata_batch_init(&retry_state->recv_initial_metadata); 2285 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata = 2286 &retry_state->recv_initial_metadata; 2287 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available = 2288 &retry_state->trailing_metadata_available; 2289 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready, 2290 recv_initial_metadata_ready, batch_data, 2291 grpc_schedule_on_exec_ctx); 2292 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready = 2293 &retry_state->recv_initial_metadata_ready; 2294 } 2295 2296 // Adds retriable recv_message op to batch_data. 2297 static void add_retriable_recv_message_op( 2298 call_data* calld, subchannel_call_retry_state* retry_state, 2299 subchannel_batch_data* batch_data) { 2300 ++retry_state->started_recv_message_count; 2301 batch_data->batch.recv_message = true; 2302 batch_data->batch.payload->recv_message.recv_message = 2303 &retry_state->recv_message; 2304 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready, 2305 batch_data, grpc_schedule_on_exec_ctx); 2306 batch_data->batch.payload->recv_message.recv_message_ready = 2307 &retry_state->recv_message_ready; 2308 } 2309 2310 // Adds retriable recv_trailing_metadata op to batch_data. 2311 static void add_retriable_recv_trailing_metadata_op( 2312 call_data* calld, subchannel_call_retry_state* retry_state, 2313 subchannel_batch_data* batch_data) { 2314 retry_state->started_recv_trailing_metadata = true; 2315 batch_data->batch.recv_trailing_metadata = true; 2316 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata); 2317 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = 2318 &retry_state->recv_trailing_metadata; 2319 batch_data->batch.payload->recv_trailing_metadata.collect_stats = 2320 &retry_state->collect_stats; 2321 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready, 2322 recv_trailing_metadata_ready, batch_data, 2323 grpc_schedule_on_exec_ctx); 2324 batch_data->batch.payload->recv_trailing_metadata 2325 .recv_trailing_metadata_ready = 2326 &retry_state->recv_trailing_metadata_ready; 2327 } 2328 2329 // Helper function used to start a recv_trailing_metadata batch. This 2330 // is used in the case where a recv_initial_metadata or recv_message 2331 // op fails in a way that we know the call is over but when the application 2332 // has not yet started its own recv_trailing_metadata op. 2333 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { 2334 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2335 call_data* calld = static_cast<call_data*>(elem->call_data); 2336 if (grpc_client_channel_trace.enabled()) { 2337 gpr_log(GPR_INFO, 2338 "chand=%p calld=%p: call failed but recv_trailing_metadata not " 2339 "started; starting it internally", 2340 chand, calld); 2341 } 2342 subchannel_call_retry_state* retry_state = 2343 static_cast<subchannel_call_retry_state*>( 2344 grpc_connected_subchannel_call_get_parent_data( 2345 calld->subchannel_call)); 2346 // Create batch_data with 2 refs, since this batch will be unreffed twice: 2347 // once for the recv_trailing_metadata_ready callback when the subchannel 2348 // batch returns, and again when we actually get a recv_trailing_metadata 2349 // op from the surface. 2350 subchannel_batch_data* batch_data = 2351 batch_data_create(elem, 2, false /* set_on_complete */); 2352 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); 2353 retry_state->recv_trailing_metadata_internal_batch = batch_data; 2354 // Note: This will release the call combiner. 2355 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); 2356 } 2357 2358 // If there are any cached send ops that need to be replayed on the 2359 // current subchannel call, creates and returns a new subchannel batch 2360 // to replay those ops. Otherwise, returns nullptr. 2361 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( 2362 grpc_call_element* elem, subchannel_call_retry_state* retry_state) { 2363 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2364 call_data* calld = static_cast<call_data*>(elem->call_data); 2365 subchannel_batch_data* replay_batch_data = nullptr; 2366 // send_initial_metadata. 2367 if (calld->seen_send_initial_metadata && 2368 !retry_state->started_send_initial_metadata && 2369 !calld->pending_send_initial_metadata) { 2370 if (grpc_client_channel_trace.enabled()) { 2371 gpr_log(GPR_INFO, 2372 "chand=%p calld=%p: replaying previously completed " 2373 "send_initial_metadata op", 2374 chand, calld); 2375 } 2376 replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); 2377 add_retriable_send_initial_metadata_op(calld, retry_state, 2378 replay_batch_data); 2379 } 2380 // send_message. 2381 // Note that we can only have one send_message op in flight at a time. 2382 if (retry_state->started_send_message_count < calld->send_messages->size() && 2383 retry_state->started_send_message_count == 2384 retry_state->completed_send_message_count && 2385 !calld->pending_send_message) { 2386 if (grpc_client_channel_trace.enabled()) { 2387 gpr_log(GPR_INFO, 2388 "chand=%p calld=%p: replaying previously completed " 2389 "send_message op", 2390 chand, calld); 2391 } 2392 if (replay_batch_data == nullptr) { 2393 replay_batch_data = 2394 batch_data_create(elem, 1, true /* set_on_complete */); 2395 } 2396 add_retriable_send_message_op(elem, retry_state, replay_batch_data); 2397 } 2398 // send_trailing_metadata. 2399 // Note that we only add this op if we have no more send_message ops 2400 // to start, since we can't send down any more send_message ops after 2401 // send_trailing_metadata. 2402 if (calld->seen_send_trailing_metadata && 2403 retry_state->started_send_message_count == calld->send_messages->size() && 2404 !retry_state->started_send_trailing_metadata && 2405 !calld->pending_send_trailing_metadata) { 2406 if (grpc_client_channel_trace.enabled()) { 2407 gpr_log(GPR_INFO, 2408 "chand=%p calld=%p: replaying previously completed " 2409 "send_trailing_metadata op", 2410 chand, calld); 2411 } 2412 if (replay_batch_data == nullptr) { 2413 replay_batch_data = 2414 batch_data_create(elem, 1, true /* set_on_complete */); 2415 } 2416 add_retriable_send_trailing_metadata_op(calld, retry_state, 2417 replay_batch_data); 2418 } 2419 return replay_batch_data; 2420 } 2421 2422 // Adds subchannel batches for pending batches to batches, updating 2423 // *num_batches as needed. 2424 static void add_subchannel_batches_for_pending_batches( 2425 grpc_call_element* elem, subchannel_call_retry_state* retry_state, 2426 grpc_core::CallCombinerClosureList* closures) { 2427 call_data* calld = static_cast<call_data*>(elem->call_data); 2428 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 2429 pending_batch* pending = &calld->pending_batches[i]; 2430 grpc_transport_stream_op_batch* batch = pending->batch; 2431 if (batch == nullptr) continue; 2432 // Skip any batch that either (a) has already been started on this 2433 // subchannel call or (b) we can't start yet because we're still 2434 // replaying send ops that need to be completed first. 2435 // TODO(roth): Note that if any one op in the batch can't be sent 2436 // yet due to ops that we're replaying, we don't start any of the ops 2437 // in the batch. This is probably okay, but it could conceivably 2438 // lead to increased latency in some cases -- e.g., we could delay 2439 // starting a recv op due to it being in the same batch with a send 2440 // op. If/when we revamp the callback protocol in 2441 // transport_stream_op_batch, we may be able to fix this. 2442 if (batch->send_initial_metadata && 2443 retry_state->started_send_initial_metadata) { 2444 continue; 2445 } 2446 if (batch->send_message && retry_state->completed_send_message_count < 2447 retry_state->started_send_message_count) { 2448 continue; 2449 } 2450 // Note that we only start send_trailing_metadata if we have no more 2451 // send_message ops to start, since we can't send down any more 2452 // send_message ops after send_trailing_metadata. 2453 if (batch->send_trailing_metadata && 2454 (retry_state->started_send_message_count + batch->send_message < 2455 calld->send_messages->size() || 2456 retry_state->started_send_trailing_metadata)) { 2457 continue; 2458 } 2459 if (batch->recv_initial_metadata && 2460 retry_state->started_recv_initial_metadata) { 2461 continue; 2462 } 2463 if (batch->recv_message && retry_state->completed_recv_message_count < 2464 retry_state->started_recv_message_count) { 2465 continue; 2466 } 2467 if (batch->recv_trailing_metadata && 2468 retry_state->started_recv_trailing_metadata) { 2469 // If we previously completed a recv_trailing_metadata op 2470 // initiated by start_internal_recv_trailing_metadata(), use the 2471 // result of that instead of trying to re-start this op. 2472 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch != 2473 nullptr))) { 2474 // If the batch completed, then trigger the completion callback 2475 // directly, so that we return the previously returned results to 2476 // the application. Otherwise, just unref the internally 2477 // started subchannel batch, since we'll propagate the 2478 // completion when it completes. 2479 if (retry_state->completed_recv_trailing_metadata) { 2480 // Batches containing recv_trailing_metadata always succeed. 2481 closures->Add( 2482 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE, 2483 "re-executing recv_trailing_metadata_ready to propagate " 2484 "internally triggered result"); 2485 } else { 2486 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); 2487 } 2488 retry_state->recv_trailing_metadata_internal_batch = nullptr; 2489 } 2490 continue; 2491 } 2492 // If we're not retrying, just send the batch as-is. 2493 if (calld->method_params == nullptr || 2494 calld->method_params->retry_policy() == nullptr || 2495 calld->retry_committed) { 2496 add_closure_for_subchannel_batch(elem, batch, closures); 2497 pending_batch_clear(calld, pending); 2498 continue; 2499 } 2500 // Create batch with the right number of callbacks. 2501 const bool has_send_ops = batch->send_initial_metadata || 2502 batch->send_message || 2503 batch->send_trailing_metadata; 2504 const int num_callbacks = has_send_ops + batch->recv_initial_metadata + 2505 batch->recv_message + 2506 batch->recv_trailing_metadata; 2507 subchannel_batch_data* batch_data = batch_data_create( 2508 elem, num_callbacks, has_send_ops /* set_on_complete */); 2509 // Cache send ops if needed. 2510 maybe_cache_send_ops_for_batch(calld, pending); 2511 // send_initial_metadata. 2512 if (batch->send_initial_metadata) { 2513 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data); 2514 } 2515 // send_message. 2516 if (batch->send_message) { 2517 add_retriable_send_message_op(elem, retry_state, batch_data); 2518 } 2519 // send_trailing_metadata. 2520 if (batch->send_trailing_metadata) { 2521 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data); 2522 } 2523 // recv_initial_metadata. 2524 if (batch->recv_initial_metadata) { 2525 // recv_flags is only used on the server side. 2526 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr); 2527 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data); 2528 } 2529 // recv_message. 2530 if (batch->recv_message) { 2531 add_retriable_recv_message_op(calld, retry_state, batch_data); 2532 } 2533 // recv_trailing_metadata. 2534 if (batch->recv_trailing_metadata) { 2535 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); 2536 } 2537 add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); 2538 // Track number of pending subchannel send batches. 2539 // If this is the first one, take a ref to the call stack. 2540 if (batch->send_initial_metadata || batch->send_message || 2541 batch->send_trailing_metadata) { 2542 if (calld->num_pending_retriable_subchannel_send_batches == 0) { 2543 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); 2544 } 2545 ++calld->num_pending_retriable_subchannel_send_batches; 2546 } 2547 } 2548 } 2549 2550 // Constructs and starts whatever subchannel batches are needed on the 2551 // subchannel call. 2552 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { 2553 grpc_call_element* elem = static_cast<grpc_call_element*>(arg); 2554 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2555 call_data* calld = static_cast<call_data*>(elem->call_data); 2556 if (grpc_client_channel_trace.enabled()) { 2557 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", 2558 chand, calld); 2559 } 2560 subchannel_call_retry_state* retry_state = 2561 static_cast<subchannel_call_retry_state*>( 2562 grpc_connected_subchannel_call_get_parent_data( 2563 calld->subchannel_call)); 2564 // Construct list of closures to execute, one for each pending batch. 2565 grpc_core::CallCombinerClosureList closures; 2566 // Replay previously-returned send_* ops if needed. 2567 subchannel_batch_data* replay_batch_data = 2568 maybe_create_subchannel_batch_for_replay(elem, retry_state); 2569 if (replay_batch_data != nullptr) { 2570 add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, 2571 &closures); 2572 // Track number of pending subchannel send batches. 2573 // If this is the first one, take a ref to the call stack. 2574 if (calld->num_pending_retriable_subchannel_send_batches == 0) { 2575 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches"); 2576 } 2577 ++calld->num_pending_retriable_subchannel_send_batches; 2578 } 2579 // Now add pending batches. 2580 add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); 2581 // Start batches on subchannel call. 2582 if (grpc_client_channel_trace.enabled()) { 2583 gpr_log(GPR_INFO, 2584 "chand=%p calld=%p: starting %" PRIuPTR 2585 " retriable batches on subchannel_call=%p", 2586 chand, calld, closures.size(), calld->subchannel_call); 2587 } 2588 // Note: This will yield the call combiner. 2589 closures.RunClosures(calld->call_combiner); 2590 } 2591 2592 // 2593 // Channelz 2594 // 2595 2596 static void recv_trailing_metadata_ready_channelz(void* arg, 2597 grpc_error* error) { 2598 grpc_call_element* elem = static_cast<grpc_call_element*>(arg); 2599 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2600 call_data* calld = static_cast<call_data*>(elem->call_data); 2601 if (grpc_client_channel_trace.enabled()) { 2602 gpr_log(GPR_INFO, 2603 "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, " 2604 "error=%s", 2605 chand, calld, grpc_error_string(error)); 2606 } 2607 GPR_ASSERT(calld->recv_trailing_metadata != nullptr); 2608 grpc_status_code status = GRPC_STATUS_OK; 2609 grpc_metadata_batch* md_batch = calld->recv_trailing_metadata; 2610 get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr); 2611 grpc_core::channelz::SubchannelNode* channelz_subchannel = 2612 calld->pick.connected_subchannel->channelz_subchannel(); 2613 GPR_ASSERT(channelz_subchannel != nullptr); 2614 if (status == GRPC_STATUS_OK) { 2615 channelz_subchannel->RecordCallSucceeded(); 2616 } else { 2617 channelz_subchannel->RecordCallFailed(); 2618 } 2619 calld->recv_trailing_metadata = nullptr; 2620 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error); 2621 } 2622 2623 // If channelz is enabled, intercept recv_trailing so that we may check the 2624 // status and associate it to a subchannel. 2625 // Returns true if callback was intercepted, false otherwise. 2626 static void maybe_intercept_recv_trailing_metadata_for_channelz( 2627 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { 2628 call_data* calld = static_cast<call_data*>(elem->call_data); 2629 // only intercept payloads with recv trailing. 2630 if (!batch->recv_trailing_metadata) { 2631 return; 2632 } 2633 // only add interceptor is channelz is enabled. 2634 if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) { 2635 return; 2636 } 2637 if (grpc_client_channel_trace.enabled()) { 2638 gpr_log(GPR_INFO, 2639 "calld=%p batch=%p: intercepting recv trailing for channelz", calld, 2640 batch); 2641 } 2642 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz, 2643 recv_trailing_metadata_ready_channelz, elem, 2644 grpc_schedule_on_exec_ctx); 2645 // save some state needed for the interception callback. 2646 GPR_ASSERT(calld->recv_trailing_metadata == nullptr); 2647 calld->recv_trailing_metadata = 2648 batch->payload->recv_trailing_metadata.recv_trailing_metadata; 2649 calld->original_recv_trailing_metadata = 2650 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; 2651 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = 2652 &calld->recv_trailing_metadata_ready_channelz; 2653 } 2654 2655 // 2656 // LB pick 2657 // 2658 2659 static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { 2660 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2661 call_data* calld = static_cast<call_data*>(elem->call_data); 2662 const size_t parent_data_size = 2663 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0; 2664 const grpc_core::ConnectedSubchannel::CallArgs call_args = { 2665 calld->pollent, // pollent 2666 calld->path, // path 2667 calld->call_start_time, // start_time 2668 calld->deadline, // deadline 2669 calld->arena, // arena 2670 calld->pick.subchannel_call_context, // context 2671 calld->call_combiner, // call_combiner 2672 parent_data_size // parent_data_size 2673 }; 2674 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( 2675 call_args, &calld->subchannel_call); 2676 if (grpc_client_channel_trace.enabled()) { 2677 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", 2678 chand, calld, calld->subchannel_call, grpc_error_string(new_error)); 2679 } 2680 if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { 2681 new_error = grpc_error_add_child(new_error, error); 2682 pending_batches_fail(elem, new_error, true /* yield_call_combiner */); 2683 } else { 2684 grpc_core::channelz::SubchannelNode* channelz_subchannel = 2685 calld->pick.connected_subchannel->channelz_subchannel(); 2686 if (channelz_subchannel != nullptr) { 2687 channelz_subchannel->RecordCallStarted(); 2688 } 2689 if (parent_data_size > 0) { 2690 subchannel_call_retry_state* retry_state = 2691 static_cast<subchannel_call_retry_state*>( 2692 grpc_connected_subchannel_call_get_parent_data( 2693 calld->subchannel_call)); 2694 retry_state->batch_payload.context = calld->pick.subchannel_call_context; 2695 } 2696 pending_batches_resume(elem); 2697 } 2698 GRPC_ERROR_UNREF(error); 2699 } 2700 2701 // Invoked when a pick is completed, on both success or failure. 2702 static void pick_done(void* arg, grpc_error* error) { 2703 grpc_call_element* elem = static_cast<grpc_call_element*>(arg); 2704 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2705 call_data* calld = static_cast<call_data*>(elem->call_data); 2706 if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { 2707 // Failed to create subchannel. 2708 // If there was no error, this is an LB policy drop, in which case 2709 // we return an error; otherwise, we may retry. 2710 grpc_status_code status = GRPC_STATUS_OK; 2711 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, 2712 nullptr); 2713 if (error == GRPC_ERROR_NONE || !calld->enable_retries || 2714 !maybe_retry(elem, nullptr /* batch_data */, status, 2715 nullptr /* server_pushback_md */)) { 2716 grpc_error* new_error = 2717 error == GRPC_ERROR_NONE 2718 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( 2719 "Call dropped by load balancing policy") 2720 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 2721 "Failed to create subchannel", &error, 1); 2722 if (grpc_client_channel_trace.enabled()) { 2723 gpr_log(GPR_INFO, 2724 "chand=%p calld=%p: failed to create subchannel: error=%s", 2725 chand, calld, grpc_error_string(new_error)); 2726 } 2727 pending_batches_fail(elem, new_error, true /* yield_call_combiner */); 2728 } 2729 } else { 2730 /* Create call on subchannel. */ 2731 create_subchannel_call(elem, GRPC_ERROR_REF(error)); 2732 } 2733 } 2734 2735 static void maybe_add_call_to_channel_interested_parties_locked( 2736 grpc_call_element* elem) { 2737 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2738 call_data* calld = static_cast<call_data*>(elem->call_data); 2739 if (!calld->pollent_added_to_interested_parties) { 2740 calld->pollent_added_to_interested_parties = true; 2741 grpc_polling_entity_add_to_pollset_set(calld->pollent, 2742 chand->interested_parties); 2743 } 2744 } 2745 2746 static void maybe_del_call_from_channel_interested_parties_locked( 2747 grpc_call_element* elem) { 2748 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2749 call_data* calld = static_cast<call_data*>(elem->call_data); 2750 if (calld->pollent_added_to_interested_parties) { 2751 calld->pollent_added_to_interested_parties = false; 2752 grpc_polling_entity_del_from_pollset_set(calld->pollent, 2753 chand->interested_parties); 2754 } 2755 } 2756 2757 // Invoked when a pick is completed to leave the client_channel combiner 2758 // and continue processing in the call combiner. 2759 // If needed, removes the call's polling entity from chand->interested_parties. 2760 static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { 2761 call_data* calld = static_cast<call_data*>(elem->call_data); 2762 maybe_del_call_from_channel_interested_parties_locked(elem); 2763 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, 2764 grpc_schedule_on_exec_ctx); 2765 GRPC_CLOSURE_SCHED(&calld->pick_closure, error); 2766 } 2767 2768 namespace grpc_core { 2769 2770 // Performs subchannel pick via LB policy. 2771 class LbPicker { 2772 public: 2773 // Starts a pick on chand->lb_policy. 2774 static void StartLocked(grpc_call_element* elem) { 2775 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2776 call_data* calld = static_cast<call_data*>(elem->call_data); 2777 if (grpc_client_channel_trace.enabled()) { 2778 gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", 2779 chand, calld, chand->lb_policy.get()); 2780 } 2781 // If this is a retry, use the send_initial_metadata payload that 2782 // we've cached; otherwise, use the pending batch. The 2783 // send_initial_metadata batch will be the first pending batch in the 2784 // list, as set by get_batch_index() above. 2785 calld->pick.initial_metadata = 2786 calld->seen_send_initial_metadata 2787 ? &calld->send_initial_metadata 2788 : calld->pending_batches[0] 2789 .batch->payload->send_initial_metadata.send_initial_metadata; 2790 calld->pick.initial_metadata_flags = 2791 calld->seen_send_initial_metadata 2792 ? calld->send_initial_metadata_flags 2793 : calld->pending_batches[0] 2794 .batch->payload->send_initial_metadata 2795 .send_initial_metadata_flags; 2796 GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, 2797 grpc_combiner_scheduler(chand->combiner)); 2798 calld->pick.on_complete = &calld->pick_closure; 2799 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); 2800 grpc_error* error = GRPC_ERROR_NONE; 2801 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error); 2802 if (GPR_LIKELY(pick_done)) { 2803 // Pick completed synchronously. 2804 if (grpc_client_channel_trace.enabled()) { 2805 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", 2806 chand, calld); 2807 } 2808 pick_done_locked(elem, error); 2809 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); 2810 } else { 2811 // Pick will be returned asynchronously. 2812 // Add the polling entity from call_data to the channel_data's 2813 // interested_parties, so that the I/O of the LB policy can be done 2814 // under it. It will be removed in pick_done_locked(). 2815 maybe_add_call_to_channel_interested_parties_locked(elem); 2816 // Request notification on call cancellation. 2817 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); 2818 grpc_call_combiner_set_notify_on_cancel( 2819 calld->call_combiner, 2820 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, 2821 &LbPicker::CancelLocked, elem, 2822 grpc_combiner_scheduler(chand->combiner))); 2823 } 2824 } 2825 2826 private: 2827 // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. 2828 // Unrefs the LB policy and invokes pick_done_locked(). 2829 static void DoneLocked(void* arg, grpc_error* error) { 2830 grpc_call_element* elem = static_cast<grpc_call_element*>(arg); 2831 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2832 call_data* calld = static_cast<call_data*>(elem->call_data); 2833 if (grpc_client_channel_trace.enabled()) { 2834 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", 2835 chand, calld); 2836 } 2837 pick_done_locked(elem, GRPC_ERROR_REF(error)); 2838 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); 2839 } 2840 2841 // Note: This runs under the client_channel combiner, but will NOT be 2842 // holding the call combiner. 2843 static void CancelLocked(void* arg, grpc_error* error) { 2844 grpc_call_element* elem = static_cast<grpc_call_element*>(arg); 2845 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2846 call_data* calld = static_cast<call_data*>(elem->call_data); 2847 // Note: chand->lb_policy may have changed since we started our pick, 2848 // in which case we will be cancelling the pick on a policy other than 2849 // the one we started it on. However, this will just be a no-op. 2850 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { 2851 if (grpc_client_channel_trace.enabled()) { 2852 gpr_log(GPR_INFO, 2853 "chand=%p calld=%p: cancelling pick from LB policy %p", chand, 2854 calld, chand->lb_policy.get()); 2855 } 2856 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); 2857 } 2858 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); 2859 } 2860 }; 2861 2862 } // namespace grpc_core 2863 2864 // Applies service config to the call. Must be invoked once we know 2865 // that the resolver has returned results to the channel. 2866 static void apply_service_config_to_call_locked(grpc_call_element* elem) { 2867 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2868 call_data* calld = static_cast<call_data*>(elem->call_data); 2869 if (grpc_client_channel_trace.enabled()) { 2870 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call", 2871 chand, calld); 2872 } 2873 if (chand->retry_throttle_data != nullptr) { 2874 calld->retry_throttle_data = chand->retry_throttle_data->Ref(); 2875 } 2876 if (chand->method_params_table != nullptr) { 2877 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup( 2878 *chand->method_params_table, calld->path); 2879 if (calld->method_params != nullptr) { 2880 // If the deadline from the service config is shorter than the one 2881 // from the client API, reset the deadline timer. 2882 if (chand->deadline_checking_enabled && 2883 calld->method_params->timeout() != 0) { 2884 const grpc_millis per_method_deadline = 2885 grpc_timespec_to_millis_round_up(calld->call_start_time) + 2886 calld->method_params->timeout(); 2887 if (per_method_deadline < calld->deadline) { 2888 calld->deadline = per_method_deadline; 2889 grpc_deadline_state_reset(elem, calld->deadline); 2890 } 2891 } 2892 // If the service config set wait_for_ready and the application 2893 // did not explicitly set it, use the value from the service config. 2894 uint32_t* send_initial_metadata_flags = 2895 &calld->pending_batches[0] 2896 .batch->payload->send_initial_metadata 2897 .send_initial_metadata_flags; 2898 if (GPR_UNLIKELY( 2899 calld->method_params->wait_for_ready() != 2900 ClientChannelMethodParams::WAIT_FOR_READY_UNSET && 2901 !(*send_initial_metadata_flags & 2902 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { 2903 if (calld->method_params->wait_for_ready() == 2904 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { 2905 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; 2906 } else { 2907 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; 2908 } 2909 } 2910 } 2911 } 2912 // If no retry policy, disable retries. 2913 // TODO(roth): Remove this when adding support for transparent retries. 2914 if (calld->method_params == nullptr || 2915 calld->method_params->retry_policy() == nullptr) { 2916 calld->enable_retries = false; 2917 } 2918 } 2919 2920 // Invoked once resolver results are available. 2921 static void process_service_config_and_start_lb_pick_locked( 2922 grpc_call_element* elem) { 2923 call_data* calld = static_cast<call_data*>(elem->call_data); 2924 // Only get service config data on the first attempt. 2925 if (GPR_LIKELY(calld->num_attempts_completed == 0)) { 2926 apply_service_config_to_call_locked(elem); 2927 } 2928 // Start LB pick. 2929 grpc_core::LbPicker::StartLocked(elem); 2930 } 2931 2932 namespace grpc_core { 2933 2934 // Handles waiting for a resolver result. 2935 // Used only for the first call on an idle channel. 2936 class ResolverResultWaiter { 2937 public: 2938 explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { 2939 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2940 call_data* calld = static_cast<call_data*>(elem->call_data); 2941 if (grpc_client_channel_trace.enabled()) { 2942 gpr_log(GPR_INFO, 2943 "chand=%p calld=%p: deferring pick pending resolver result", 2944 chand, calld); 2945 } 2946 // Add closure to be run when a resolver result is available. 2947 GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, 2948 grpc_combiner_scheduler(chand->combiner)); 2949 AddToWaitingList(); 2950 // Set cancellation closure, so that we abort if the call is cancelled. 2951 GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, 2952 this, grpc_combiner_scheduler(chand->combiner)); 2953 grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, 2954 &cancel_closure_); 2955 } 2956 2957 private: 2958 // Adds closure_ to chand->waiting_for_resolver_result_closures. 2959 void AddToWaitingList() { 2960 channel_data* chand = static_cast<channel_data*>(elem_->channel_data); 2961 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, 2962 &done_closure_, GRPC_ERROR_NONE); 2963 } 2964 2965 // Invoked when a resolver result is available. 2966 static void DoneLocked(void* arg, grpc_error* error) { 2967 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); 2968 // If CancelLocked() has already run, delete ourselves without doing 2969 // anything. Note that the call stack may have already been destroyed, 2970 // so it's not safe to access anything in elem_. 2971 if (GPR_UNLIKELY(self->finished_)) { 2972 if (grpc_client_channel_trace.enabled()) { 2973 gpr_log(GPR_INFO, "call cancelled before resolver result"); 2974 } 2975 Delete(self); 2976 return; 2977 } 2978 // Otherwise, process the resolver result. 2979 grpc_call_element* elem = self->elem_; 2980 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 2981 call_data* calld = static_cast<call_data*>(elem->call_data); 2982 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { 2983 if (grpc_client_channel_trace.enabled()) { 2984 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", 2985 chand, calld); 2986 } 2987 pick_done_locked(elem, GRPC_ERROR_REF(error)); 2988 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { 2989 // Shutting down. 2990 if (grpc_client_channel_trace.enabled()) { 2991 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, 2992 calld); 2993 } 2994 pick_done_locked(elem, 2995 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); 2996 } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { 2997 // Transient resolver failure. 2998 // If call has wait_for_ready=true, try again; otherwise, fail. 2999 uint32_t send_initial_metadata_flags = 3000 calld->seen_send_initial_metadata 3001 ? calld->send_initial_metadata_flags 3002 : calld->pending_batches[0] 3003 .batch->payload->send_initial_metadata 3004 .send_initial_metadata_flags; 3005 if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { 3006 if (grpc_client_channel_trace.enabled()) { 3007 gpr_log(GPR_INFO, 3008 "chand=%p calld=%p: resolver returned but no LB policy; " 3009 "wait_for_ready=true; trying again", 3010 chand, calld); 3011 } 3012 // Re-add ourselves to the waiting list. 3013 self->AddToWaitingList(); 3014 // Return early so that we don't set finished_ to true below. 3015 return; 3016 } else { 3017 if (grpc_client_channel_trace.enabled()) { 3018 gpr_log(GPR_INFO, 3019 "chand=%p calld=%p: resolver returned but no LB policy; " 3020 "wait_for_ready=false; failing", 3021 chand, calld); 3022 } 3023 pick_done_locked( 3024 elem, 3025 grpc_error_set_int( 3026 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), 3027 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); 3028 } 3029 } else { 3030 if (grpc_client_channel_trace.enabled()) { 3031 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", 3032 chand, calld); 3033 } 3034 process_service_config_and_start_lb_pick_locked(elem); 3035 } 3036 self->finished_ = true; 3037 } 3038 3039 // Invoked when the call is cancelled. 3040 // Note: This runs under the client_channel combiner, but will NOT be 3041 // holding the call combiner. 3042 static void CancelLocked(void* arg, grpc_error* error) { 3043 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); 3044 // If DoneLocked() has already run, delete ourselves without doing anything. 3045 if (GPR_LIKELY(self->finished_)) { 3046 Delete(self); 3047 return; 3048 } 3049 // If we are being cancelled, immediately invoke pick_done_locked() 3050 // to propagate the error back to the caller. 3051 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { 3052 grpc_call_element* elem = self->elem_; 3053 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3054 call_data* calld = static_cast<call_data*>(elem->call_data); 3055 if (grpc_client_channel_trace.enabled()) { 3056 gpr_log(GPR_INFO, 3057 "chand=%p calld=%p: cancelling call waiting for name " 3058 "resolution", 3059 chand, calld); 3060 } 3061 // Note: Although we are not in the call combiner here, we are 3062 // basically stealing the call combiner from the pending pick, so 3063 // it's safe to call pick_done_locked() here -- we are essentially 3064 // calling it here instead of calling it in DoneLocked(). 3065 pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( 3066 "Pick cancelled", &error, 1)); 3067 } 3068 self->finished_ = true; 3069 } 3070 3071 grpc_call_element* elem_; 3072 grpc_closure done_closure_; 3073 grpc_closure cancel_closure_; 3074 bool finished_ = false; 3075 }; 3076 3077 } // namespace grpc_core 3078 3079 static void start_pick_locked(void* arg, grpc_error* ignored) { 3080 grpc_call_element* elem = static_cast<grpc_call_element*>(arg); 3081 call_data* calld = static_cast<call_data*>(elem->call_data); 3082 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3083 GPR_ASSERT(calld->pick.connected_subchannel == nullptr); 3084 GPR_ASSERT(calld->subchannel_call == nullptr); 3085 if (GPR_LIKELY(chand->lb_policy != nullptr)) { 3086 // We already have resolver results, so process the service config 3087 // and start an LB pick. 3088 process_service_config_and_start_lb_pick_locked(elem); 3089 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { 3090 pick_done_locked(elem, 3091 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); 3092 } else { 3093 // We do not yet have an LB policy, so wait for a resolver result. 3094 if (GPR_UNLIKELY(!chand->started_resolving)) { 3095 start_resolving_locked(chand); 3096 } 3097 // Create a new waiter, which will delete itself when done. 3098 grpc_core::New<grpc_core::ResolverResultWaiter>(elem); 3099 // Add the polling entity from call_data to the channel_data's 3100 // interested_parties, so that the I/O of the resolver can be done 3101 // under it. It will be removed in pick_done_locked(). 3102 maybe_add_call_to_channel_interested_parties_locked(elem); 3103 } 3104 } 3105 3106 // 3107 // filter call vtable functions 3108 // 3109 3110 static void cc_start_transport_stream_op_batch( 3111 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { 3112 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); 3113 call_data* calld = static_cast<call_data*>(elem->call_data); 3114 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3115 if (GPR_LIKELY(chand->deadline_checking_enabled)) { 3116 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); 3117 } 3118 // If we've previously been cancelled, immediately fail any new batches. 3119 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) { 3120 if (grpc_client_channel_trace.enabled()) { 3121 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", 3122 chand, calld, grpc_error_string(calld->cancel_error)); 3123 } 3124 // Note: This will release the call combiner. 3125 grpc_transport_stream_op_batch_finish_with_failure( 3126 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); 3127 return; 3128 } 3129 // Handle cancellation. 3130 if (GPR_UNLIKELY(batch->cancel_stream)) { 3131 // Stash a copy of cancel_error in our call data, so that we can use 3132 // it for subsequent operations. This ensures that if the call is 3133 // cancelled before any batches are passed down (e.g., if the deadline 3134 // is in the past when the call starts), we can return the right 3135 // error to the caller when the first batch does get passed down. 3136 GRPC_ERROR_UNREF(calld->cancel_error); 3137 calld->cancel_error = 3138 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); 3139 if (grpc_client_channel_trace.enabled()) { 3140 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand, 3141 calld, grpc_error_string(calld->cancel_error)); 3142 } 3143 // If we do not have a subchannel call (i.e., a pick has not yet 3144 // been started), fail all pending batches. Otherwise, send the 3145 // cancellation down to the subchannel call. 3146 if (calld->subchannel_call == nullptr) { 3147 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error), 3148 false /* yield_call_combiner */); 3149 // Note: This will release the call combiner. 3150 grpc_transport_stream_op_batch_finish_with_failure( 3151 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); 3152 } else { 3153 // Note: This will release the call combiner. 3154 grpc_subchannel_call_process_op(calld->subchannel_call, batch); 3155 } 3156 return; 3157 } 3158 // Add the batch to the pending list. 3159 pending_batches_add(elem, batch); 3160 // Check if we've already gotten a subchannel call. 3161 // Note that once we have completed the pick, we do not need to enter 3162 // the channel combiner, which is more efficient (especially for 3163 // streaming calls). 3164 if (calld->subchannel_call != nullptr) { 3165 if (grpc_client_channel_trace.enabled()) { 3166 gpr_log(GPR_INFO, 3167 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, 3168 calld, calld->subchannel_call); 3169 } 3170 pending_batches_resume(elem); 3171 return; 3172 } 3173 // We do not yet have a subchannel call. 3174 // For batches containing a send_initial_metadata op, enter the channel 3175 // combiner to start a pick. 3176 if (GPR_LIKELY(batch->send_initial_metadata)) { 3177 if (grpc_client_channel_trace.enabled()) { 3178 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", 3179 chand, calld); 3180 } 3181 GRPC_CLOSURE_SCHED( 3182 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked, 3183 elem, grpc_combiner_scheduler(chand->combiner)), 3184 GRPC_ERROR_NONE); 3185 } else { 3186 // For all other batches, release the call combiner. 3187 if (grpc_client_channel_trace.enabled()) { 3188 gpr_log(GPR_INFO, 3189 "chand=%p calld=%p: saved batch, yielding call combiner", chand, 3190 calld); 3191 } 3192 GRPC_CALL_COMBINER_STOP(calld->call_combiner, 3193 "batch does not include send_initial_metadata"); 3194 } 3195 } 3196 3197 /* Constructor for call_data */ 3198 static grpc_error* cc_init_call_elem(grpc_call_element* elem, 3199 const grpc_call_element_args* args) { 3200 call_data* calld = static_cast<call_data*>(elem->call_data); 3201 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3202 // Initialize data members. 3203 calld->path = grpc_slice_ref_internal(args->path); 3204 calld->call_start_time = args->start_time; 3205 calld->deadline = args->deadline; 3206 calld->arena = args->arena; 3207 calld->owning_call = args->call_stack; 3208 calld->call_combiner = args->call_combiner; 3209 if (GPR_LIKELY(chand->deadline_checking_enabled)) { 3210 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, 3211 calld->deadline); 3212 } 3213 calld->enable_retries = chand->enable_retries; 3214 calld->send_messages.Init(); 3215 return GRPC_ERROR_NONE; 3216 } 3217 3218 /* Destructor for call_data */ 3219 static void cc_destroy_call_elem(grpc_call_element* elem, 3220 const grpc_call_final_info* final_info, 3221 grpc_closure* then_schedule_closure) { 3222 call_data* calld = static_cast<call_data*>(elem->call_data); 3223 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3224 if (GPR_LIKELY(chand->deadline_checking_enabled)) { 3225 grpc_deadline_state_destroy(elem); 3226 } 3227 grpc_slice_unref_internal(calld->path); 3228 calld->retry_throttle_data.reset(); 3229 calld->method_params.reset(); 3230 GRPC_ERROR_UNREF(calld->cancel_error); 3231 if (GPR_LIKELY(calld->subchannel_call != nullptr)) { 3232 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, 3233 then_schedule_closure); 3234 then_schedule_closure = nullptr; 3235 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, 3236 "client_channel_destroy_call"); 3237 } 3238 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { 3239 GPR_ASSERT(calld->pending_batches[i].batch == nullptr); 3240 } 3241 if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) { 3242 calld->pick.connected_subchannel.reset(); 3243 } 3244 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { 3245 if (calld->pick.subchannel_call_context[i].value != nullptr) { 3246 calld->pick.subchannel_call_context[i].destroy( 3247 calld->pick.subchannel_call_context[i].value); 3248 } 3249 } 3250 calld->send_messages.Destroy(); 3251 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); 3252 } 3253 3254 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, 3255 grpc_polling_entity* pollent) { 3256 call_data* calld = static_cast<call_data*>(elem->call_data); 3257 calld->pollent = pollent; 3258 } 3259 3260 /************************************************************************* 3261 * EXPORTED SYMBOLS 3262 */ 3263 3264 const grpc_channel_filter grpc_client_channel_filter = { 3265 cc_start_transport_stream_op_batch, 3266 cc_start_transport_op, 3267 sizeof(call_data), 3268 cc_init_call_elem, 3269 cc_set_pollset_or_pollset_set, 3270 cc_destroy_call_elem, 3271 sizeof(channel_data), 3272 cc_init_channel_elem, 3273 cc_destroy_channel_elem, 3274 cc_get_channel_info, 3275 "client-channel", 3276 }; 3277 3278 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { 3279 channel_data* chand = static_cast<channel_data*>(arg); 3280 if (chand->lb_policy != nullptr) { 3281 chand->lb_policy->ExitIdleLocked(); 3282 } else { 3283 chand->exit_idle_when_lb_policy_arrives = true; 3284 if (!chand->started_resolving && chand->resolver != nullptr) { 3285 start_resolving_locked(chand); 3286 } 3287 } 3288 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect"); 3289 } 3290 3291 void grpc_client_channel_populate_child_refs( 3292 grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels, 3293 grpc_core::ChildRefsList* child_channels) { 3294 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3295 if (chand->lb_policy != nullptr) { 3296 chand->lb_policy->FillChildRefsForChannelz(child_subchannels, 3297 child_channels); 3298 } 3299 } 3300 3301 grpc_connectivity_state grpc_client_channel_check_connectivity_state( 3302 grpc_channel_element* elem, int try_to_connect) { 3303 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3304 grpc_connectivity_state out = 3305 grpc_connectivity_state_check(&chand->state_tracker); 3306 if (out == GRPC_CHANNEL_IDLE && try_to_connect) { 3307 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); 3308 GRPC_CLOSURE_SCHED( 3309 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, 3310 grpc_combiner_scheduler(chand->combiner)), 3311 GRPC_ERROR_NONE); 3312 } 3313 return out; 3314 } 3315 3316 typedef struct external_connectivity_watcher { 3317 channel_data* chand; 3318 grpc_polling_entity pollent; 3319 grpc_closure* on_complete; 3320 grpc_closure* watcher_timer_init; 3321 grpc_connectivity_state* state; 3322 grpc_closure my_closure; 3323 struct external_connectivity_watcher* next; 3324 } external_connectivity_watcher; 3325 3326 static external_connectivity_watcher* lookup_external_connectivity_watcher( 3327 channel_data* chand, grpc_closure* on_complete) { 3328 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); 3329 external_connectivity_watcher* w = 3330 chand->external_connectivity_watcher_list_head; 3331 while (w != nullptr && w->on_complete != on_complete) { 3332 w = w->next; 3333 } 3334 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); 3335 return w; 3336 } 3337 3338 static void external_connectivity_watcher_list_append( 3339 channel_data* chand, external_connectivity_watcher* w) { 3340 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); 3341 3342 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); 3343 GPR_ASSERT(!w->next); 3344 w->next = chand->external_connectivity_watcher_list_head; 3345 chand->external_connectivity_watcher_list_head = w; 3346 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); 3347 } 3348 3349 static void external_connectivity_watcher_list_remove( 3350 channel_data* chand, external_connectivity_watcher* too_remove) { 3351 GPR_ASSERT( 3352 lookup_external_connectivity_watcher(chand, too_remove->on_complete)); 3353 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); 3354 if (too_remove == chand->external_connectivity_watcher_list_head) { 3355 chand->external_connectivity_watcher_list_head = too_remove->next; 3356 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); 3357 return; 3358 } 3359 external_connectivity_watcher* w = 3360 chand->external_connectivity_watcher_list_head; 3361 while (w != nullptr) { 3362 if (w->next == too_remove) { 3363 w->next = w->next->next; 3364 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); 3365 return; 3366 } 3367 w = w->next; 3368 } 3369 GPR_UNREACHABLE_CODE(return ); 3370 } 3371 3372 int grpc_client_channel_num_external_connectivity_watchers( 3373 grpc_channel_element* elem) { 3374 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3375 int count = 0; 3376 3377 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); 3378 external_connectivity_watcher* w = 3379 chand->external_connectivity_watcher_list_head; 3380 while (w != nullptr) { 3381 count++; 3382 w = w->next; 3383 } 3384 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); 3385 3386 return count; 3387 } 3388 3389 static void on_external_watch_complete_locked(void* arg, grpc_error* error) { 3390 external_connectivity_watcher* w = 3391 static_cast<external_connectivity_watcher*>(arg); 3392 grpc_closure* follow_up = w->on_complete; 3393 grpc_polling_entity_del_from_pollset_set(&w->pollent, 3394 w->chand->interested_parties); 3395 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, 3396 "external_connectivity_watcher"); 3397 external_connectivity_watcher_list_remove(w->chand, w); 3398 gpr_free(w); 3399 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); 3400 } 3401 3402 static void watch_connectivity_state_locked(void* arg, 3403 grpc_error* error_ignored) { 3404 external_connectivity_watcher* w = 3405 static_cast<external_connectivity_watcher*>(arg); 3406 external_connectivity_watcher* found = nullptr; 3407 if (w->state != nullptr) { 3408 external_connectivity_watcher_list_append(w->chand, w); 3409 // An assumption is being made that the closure is scheduled on the exec ctx 3410 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately. 3411 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); 3412 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, 3413 grpc_combiner_scheduler(w->chand->combiner)); 3414 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, 3415 w->state, &w->my_closure); 3416 } else { 3417 GPR_ASSERT(w->watcher_timer_init == nullptr); 3418 found = lookup_external_connectivity_watcher(w->chand, w->on_complete); 3419 if (found) { 3420 GPR_ASSERT(found->on_complete == w->on_complete); 3421 grpc_connectivity_state_notify_on_state_change( 3422 &found->chand->state_tracker, nullptr, &found->my_closure); 3423 } 3424 grpc_polling_entity_del_from_pollset_set(&w->pollent, 3425 w->chand->interested_parties); 3426 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, 3427 "external_connectivity_watcher"); 3428 gpr_free(w); 3429 } 3430 } 3431 3432 void grpc_client_channel_watch_connectivity_state( 3433 grpc_channel_element* elem, grpc_polling_entity pollent, 3434 grpc_connectivity_state* state, grpc_closure* closure, 3435 grpc_closure* watcher_timer_init) { 3436 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 3437 external_connectivity_watcher* w = 3438 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w))); 3439 w->chand = chand; 3440 w->pollent = pollent; 3441 w->on_complete = closure; 3442 w->state = state; 3443 w->watcher_timer_init = watcher_timer_init; 3444 grpc_polling_entity_add_to_pollset_set(&w->pollent, 3445 chand->interested_parties); 3446 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, 3447 "external_connectivity_watcher"); 3448 GRPC_CLOSURE_SCHED( 3449 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, 3450 grpc_combiner_scheduler(chand->combiner)), 3451 GRPC_ERROR_NONE); 3452 } 3453 3454 grpc_subchannel_call* grpc_client_channel_get_subchannel_call( 3455 grpc_call_element* elem) { 3456 call_data* calld = static_cast<call_data*>(elem->call_data); 3457 return calld->subchannel_call; 3458 } 3459