1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/ext/transport/chttp2/transport/flow_control.h" 22 23 #include <inttypes.h> 24 #include <limits.h> 25 #include <math.h> 26 #include <string.h> 27 28 #include <grpc/support/alloc.h> 29 #include <grpc/support/log.h> 30 #include <grpc/support/string_util.h> 31 32 #include "src/core/ext/transport/chttp2/transport/internal.h" 33 #include "src/core/lib/gpr/string.h" 34 35 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl"); 36 37 namespace grpc_core { 38 namespace chttp2 { 39 40 namespace { 41 42 static constexpr const int kTracePadding = 30; 43 static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1; 44 45 static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { 46 char* str; 47 if (old_val != new_val) { 48 gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val); 49 } else { 50 gpr_asprintf(&str, "%" PRId64 "", old_val); 51 } 52 char* str_lp = gpr_leftpad(str, ' ', kTracePadding); 53 gpr_free(str); 54 return str_lp; 55 } 56 57 static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { 58 char* str; 59 if (old_val != new_val) { 60 gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val); 61 } else { 62 gpr_asprintf(&str, "%" PRIu32 "", old_val); 63 } 64 char* str_lp = gpr_leftpad(str, ' ', kTracePadding); 65 gpr_free(str); 66 return str_lp; 67 } 68 } // namespace 69 70 void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc, 71 StreamFlowControl* sfc) { 72 tfc_ = tfc; 73 sfc_ = sfc; 74 reason_ = reason; 75 remote_window_ = tfc->remote_window(); 76 target_window_ = tfc->target_window(); 77 announced_window_ = tfc->announced_window(); 78 if (sfc != nullptr) { 79 remote_window_delta_ = sfc->remote_window_delta(); 80 local_window_delta_ = sfc->local_window_delta(); 81 announced_window_delta_ = sfc->announced_window_delta(); 82 } 83 } 84 85 void FlowControlTrace::Finish() { 86 uint32_t acked_local_window = 87 tfc_->transport()->settings[GRPC_SENT_SETTINGS] 88 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; 89 uint32_t remote_window = 90 tfc_->transport()->settings[GRPC_PEER_SETTINGS] 91 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; 92 char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window()); 93 char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window()); 94 char* taw_str = 95 fmt_int64_diff_str(announced_window_, tfc_->announced_window()); 96 char* srw_str; 97 char* slw_str; 98 char* saw_str; 99 if (sfc_ != nullptr) { 100 srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window, 101 sfc_->remote_window_delta() + remote_window); 102 slw_str = 103 fmt_int64_diff_str(local_window_delta_ + acked_local_window, 104 sfc_->local_window_delta() + acked_local_window); 105 saw_str = 106 fmt_int64_diff_str(announced_window_delta_ + acked_local_window, 107 sfc_->announced_window_delta() + acked_local_window); 108 } else { 109 srw_str = gpr_leftpad("", ' ', kTracePadding); 110 slw_str = gpr_leftpad("", ' ', kTracePadding); 111 saw_str = gpr_leftpad("", ' ', kTracePadding); 112 } 113 gpr_log(GPR_DEBUG, 114 "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", 115 tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0, 116 tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str, 117 tlw_str, taw_str, srw_str, slw_str, saw_str); 118 gpr_free(trw_str); 119 gpr_free(tlw_str); 120 gpr_free(taw_str); 121 gpr_free(srw_str); 122 gpr_free(slw_str); 123 gpr_free(saw_str); 124 } 125 126 const char* FlowControlAction::UrgencyString(Urgency u) { 127 switch (u) { 128 case Urgency::NO_ACTION_NEEDED: 129 return "no action"; 130 case Urgency::UPDATE_IMMEDIATELY: 131 return "update immediately"; 132 case Urgency::QUEUE_UPDATE: 133 return "queue update"; 134 default: 135 GPR_UNREACHABLE_CODE(return "unknown"); 136 } 137 GPR_UNREACHABLE_CODE(return "unknown"); 138 } 139 140 void FlowControlAction::Trace(grpc_chttp2_transport* t) const { 141 char* iw_str = fmt_uint32_diff_str( 142 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 143 initial_window_size_); 144 char* mf_str = fmt_uint32_diff_str( 145 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], 146 max_frame_size_); 147 gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s", 148 UrgencyString(send_transport_update_), 149 UrgencyString(send_stream_update_), 150 UrgencyString(send_initial_window_update_), iw_str, 151 UrgencyString(send_max_frame_size_update_), mf_str); 152 gpr_free(iw_str); 153 gpr_free(mf_str); 154 } 155 156 TransportFlowControlDisabled::TransportFlowControlDisabled( 157 grpc_chttp2_transport* t) { 158 remote_window_ = kMaxWindow; 159 target_initial_window_size_ = kMaxWindow; 160 announced_window_ = kMaxWindow; 161 t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = 162 kFrameSize; 163 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = 164 kFrameSize; 165 t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = 166 kFrameSize; 167 t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = 168 kMaxWindow; 169 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = 170 kMaxWindow; 171 t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = 172 kMaxWindow; 173 } 174 175 TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t, 176 bool enable_bdp_probe) 177 : t_(t), 178 enable_bdp_probe_(enable_bdp_probe), 179 bdp_estimator_(t->peer_string), 180 pid_controller_(grpc_core::PidController::Args() 181 .set_gain_p(4) 182 .set_gain_i(8) 183 .set_gain_d(0) 184 .set_initial_control_value(TargetLogBdp()) 185 .set_min_control_value(-1) 186 .set_max_control_value(25) 187 .set_integral_range(10)), 188 last_pid_update_(grpc_core::ExecCtx::Get()->Now()) {} 189 190 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { 191 FlowControlTrace trace("t updt sent", this, nullptr); 192 const uint32_t target_announced_window = 193 static_cast<const uint32_t>(target_window()); 194 if ((writing_anyway || announced_window_ <= target_announced_window / 2) && 195 announced_window_ != target_announced_window) { 196 const uint32_t announce = static_cast<uint32_t> GPR_CLAMP( 197 target_announced_window - announced_window_, 0, kMaxWindowUpdateSize); 198 announced_window_ += announce; 199 return announce; 200 } 201 return 0; 202 } 203 204 grpc_error* TransportFlowControl::ValidateRecvData( 205 int64_t incoming_frame_size) { 206 if (incoming_frame_size > announced_window_) { 207 char* msg; 208 gpr_asprintf(&msg, 209 "frame of size %" PRId64 " overflows local window of %" PRId64, 210 incoming_frame_size, announced_window_); 211 grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); 212 gpr_free(msg); 213 return err; 214 } 215 return GRPC_ERROR_NONE; 216 } 217 218 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc, 219 const grpc_chttp2_stream* s) 220 : tfc_(tfc), s_(s) {} 221 222 grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) { 223 FlowControlTrace trace(" data recv", tfc_, this); 224 225 grpc_error* error = GRPC_ERROR_NONE; 226 error = tfc_->ValidateRecvData(incoming_frame_size); 227 if (error != GRPC_ERROR_NONE) return error; 228 229 uint32_t sent_init_window = 230 tfc_->transport()->settings[GRPC_SENT_SETTINGS] 231 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; 232 uint32_t acked_init_window = 233 tfc_->transport()->settings[GRPC_ACKED_SETTINGS] 234 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; 235 236 int64_t acked_stream_window = announced_window_delta_ + acked_init_window; 237 int64_t sent_stream_window = announced_window_delta_ + sent_init_window; 238 if (incoming_frame_size > acked_stream_window) { 239 if (incoming_frame_size <= sent_stream_window) { 240 gpr_log(GPR_ERROR, 241 "Incoming frame of size %" PRId64 242 " exceeds local window size of %" PRId64 243 ".\n" 244 "The (un-acked, future) window size would be %" PRId64 245 " which is not exceeded.\n" 246 "This would usually cause a disconnection, but allowing it due to" 247 "broken HTTP2 implementations in the wild.\n" 248 "See (for example) https://github.com/netty/netty/issues/6520.", 249 incoming_frame_size, acked_stream_window, sent_stream_window); 250 } else { 251 char* msg; 252 gpr_asprintf( 253 &msg, "frame of size %" PRId64 " overflows local window of %" PRId64, 254 incoming_frame_size, acked_stream_window); 255 grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); 256 gpr_free(msg); 257 return err; 258 } 259 } 260 261 UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size); 262 local_window_delta_ -= incoming_frame_size; 263 tfc_->CommitRecvData(incoming_frame_size); 264 return GRPC_ERROR_NONE; 265 } 266 267 uint32_t StreamFlowControl::MaybeSendUpdate() { 268 FlowControlTrace trace("s updt sent", tfc_, this); 269 if (local_window_delta_ > announced_window_delta_) { 270 uint32_t announce = static_cast<uint32_t> GPR_CLAMP( 271 local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize); 272 UpdateAnnouncedWindowDelta(tfc_, announce); 273 return announce; 274 } 275 return 0; 276 } 277 278 void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint, 279 size_t have_already) { 280 FlowControlTrace trace("app st recv", tfc_, this); 281 uint32_t max_recv_bytes; 282 uint32_t sent_init_window = 283 tfc_->transport()->settings[GRPC_SENT_SETTINGS] 284 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; 285 286 /* clamp max recv hint to an allowable size */ 287 if (max_size_hint >= UINT32_MAX - sent_init_window) { 288 max_recv_bytes = UINT32_MAX - sent_init_window; 289 } else { 290 max_recv_bytes = static_cast<uint32_t>(max_size_hint); 291 } 292 293 /* account for bytes already received but unknown to higher layers */ 294 if (max_recv_bytes >= have_already) { 295 max_recv_bytes -= static_cast<uint32_t>(have_already); 296 } else { 297 max_recv_bytes = 0; 298 } 299 300 /* add some small lookahead to keep pipelines flowing */ 301 GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); 302 if (local_window_delta_ < max_recv_bytes) { 303 uint32_t add_max_recv_bytes = 304 static_cast<uint32_t>(max_recv_bytes - local_window_delta_); 305 local_window_delta_ += add_max_recv_bytes; 306 } 307 } 308 309 // Take in a target and modifies it based on the memory pressure of the system 310 static double AdjustForMemoryPressure(grpc_resource_quota* quota, 311 double target) { 312 // do not increase window under heavy memory pressure. 313 double memory_pressure = grpc_resource_quota_get_memory_pressure(quota); 314 static const double kLowMemPressure = 0.1; 315 static const double kZeroTarget = 22; 316 static const double kHighMemPressure = 0.8; 317 static const double kMaxMemPressure = 0.9; 318 if (memory_pressure < kLowMemPressure && target < kZeroTarget) { 319 target = (target - kZeroTarget) * memory_pressure / kLowMemPressure + 320 kZeroTarget; 321 } else if (memory_pressure > kHighMemPressure) { 322 target *= 1 - GPR_MIN(1, (memory_pressure - kHighMemPressure) / 323 (kMaxMemPressure - kHighMemPressure)); 324 } 325 return target; 326 } 327 328 double TransportFlowControl::TargetLogBdp() { 329 return AdjustForMemoryPressure( 330 grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)), 331 1 + log2(bdp_estimator_.EstimateBdp())); 332 } 333 334 double TransportFlowControl::SmoothLogBdp(double value) { 335 grpc_millis now = grpc_core::ExecCtx::Get()->Now(); 336 double bdp_error = value - pid_controller_.last_control_value(); 337 const double dt = static_cast<double>(now - last_pid_update_) * 1e-3; 338 last_pid_update_ = now; 339 // Limit dt to 100ms 340 const double kMaxDt = 0.1; 341 return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt); 342 } 343 344 FlowControlAction::Urgency TransportFlowControl::DeltaUrgency( 345 int64_t value, grpc_chttp2_setting_id setting_id) { 346 int64_t delta = value - static_cast<int64_t>( 347 t_->settings[GRPC_LOCAL_SETTINGS][setting_id]); 348 // TODO(ncteisen): tune this 349 if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { 350 return FlowControlAction::Urgency::QUEUE_UPDATE; 351 } else { 352 return FlowControlAction::Urgency::NO_ACTION_NEEDED; 353 } 354 } 355 356 FlowControlAction TransportFlowControl::PeriodicUpdate() { 357 FlowControlAction action; 358 if (enable_bdp_probe_) { 359 // get bdp estimate and update initial_window accordingly. 360 // target might change based on how much memory pressure we are under 361 // TODO(ncteisen): experiment with setting target to be huge under low 362 // memory pressure. 363 const double target = pow(2, SmoothLogBdp(TargetLogBdp())); 364 365 // Though initial window 'could' drop to 0, we keep the floor at 128 366 target_initial_window_size_ = 367 static_cast<int32_t> GPR_CLAMP(target, 128, INT32_MAX); 368 369 action.set_send_initial_window_update( 370 DeltaUrgency(target_initial_window_size_, 371 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE), 372 static_cast<uint32_t>(target_initial_window_size_)); 373 374 // get bandwidth estimate and update max_frame accordingly. 375 double bw_dbl = bdp_estimator_.EstimateBandwidth(); 376 // we target the max of BDP or bandwidth in microseconds. 377 int32_t frame_size = static_cast<int32_t> GPR_CLAMP( 378 GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, 379 target_initial_window_size_), 380 16384, 16777215); 381 action.set_send_max_frame_size_update( 382 DeltaUrgency(static_cast<int64_t>(frame_size), 383 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE), 384 frame_size); 385 } 386 return UpdateAction(action); 387 } 388 389 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) { 390 // TODO(ncteisen): tune this 391 if (!s_->read_closed) { 392 uint32_t sent_init_window = 393 tfc_->transport()->settings[GRPC_SENT_SETTINGS] 394 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; 395 if (local_window_delta_ > announced_window_delta_ && 396 announced_window_delta_ + sent_init_window <= sent_init_window / 2) { 397 action.set_send_stream_update( 398 FlowControlAction::Urgency::UPDATE_IMMEDIATELY); 399 } else if (local_window_delta_ > announced_window_delta_) { 400 action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE); 401 } 402 } 403 404 return action; 405 } 406 407 } // namespace chttp2 408 } // namespace grpc_core 409