Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
     22 
     23 #include <inttypes.h>
     24 #include <limits.h>
     25 #include <math.h>
     26 #include <string.h>
     27 
     28 #include <grpc/support/alloc.h>
     29 #include <grpc/support/log.h>
     30 #include <grpc/support/string_util.h>
     31 
     32 #include "src/core/ext/transport/chttp2/transport/internal.h"
     33 #include "src/core/lib/gpr/string.h"
     34 
     35 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
     36 
     37 namespace grpc_core {
     38 namespace chttp2 {
     39 
     40 namespace {
     41 
     42 static constexpr const int kTracePadding = 30;
     43 static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1;
     44 
     45 static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
     46   char* str;
     47   if (old_val != new_val) {
     48     gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val);
     49   } else {
     50     gpr_asprintf(&str, "%" PRId64 "", old_val);
     51   }
     52   char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
     53   gpr_free(str);
     54   return str_lp;
     55 }
     56 
     57 static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
     58   char* str;
     59   if (old_val != new_val) {
     60     gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
     61   } else {
     62     gpr_asprintf(&str, "%" PRIu32 "", old_val);
     63   }
     64   char* str_lp = gpr_leftpad(str, ' ', kTracePadding);
     65   gpr_free(str);
     66   return str_lp;
     67 }
     68 }  // namespace
     69 
     70 void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
     71                             StreamFlowControl* sfc) {
     72   tfc_ = tfc;
     73   sfc_ = sfc;
     74   reason_ = reason;
     75   remote_window_ = tfc->remote_window();
     76   target_window_ = tfc->target_window();
     77   announced_window_ = tfc->announced_window();
     78   if (sfc != nullptr) {
     79     remote_window_delta_ = sfc->remote_window_delta();
     80     local_window_delta_ = sfc->local_window_delta();
     81     announced_window_delta_ = sfc->announced_window_delta();
     82   }
     83 }
     84 
     85 void FlowControlTrace::Finish() {
     86   uint32_t acked_local_window =
     87       tfc_->transport()->settings[GRPC_SENT_SETTINGS]
     88                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     89   uint32_t remote_window =
     90       tfc_->transport()->settings[GRPC_PEER_SETTINGS]
     91                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     92   char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
     93   char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
     94   char* taw_str =
     95       fmt_int64_diff_str(announced_window_, tfc_->announced_window());
     96   char* srw_str;
     97   char* slw_str;
     98   char* saw_str;
     99   if (sfc_ != nullptr) {
    100     srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
    101                                  sfc_->remote_window_delta() + remote_window);
    102     slw_str =
    103         fmt_int64_diff_str(local_window_delta_ + acked_local_window,
    104                            sfc_->local_window_delta() + acked_local_window);
    105     saw_str =
    106         fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
    107                            sfc_->announced_window_delta() + acked_local_window);
    108   } else {
    109     srw_str = gpr_leftpad("", ' ', kTracePadding);
    110     slw_str = gpr_leftpad("", ' ', kTracePadding);
    111     saw_str = gpr_leftpad("", ' ', kTracePadding);
    112   }
    113   gpr_log(GPR_DEBUG,
    114           "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
    115           tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
    116           tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
    117           tlw_str, taw_str, srw_str, slw_str, saw_str);
    118   gpr_free(trw_str);
    119   gpr_free(tlw_str);
    120   gpr_free(taw_str);
    121   gpr_free(srw_str);
    122   gpr_free(slw_str);
    123   gpr_free(saw_str);
    124 }
    125 
    126 const char* FlowControlAction::UrgencyString(Urgency u) {
    127   switch (u) {
    128     case Urgency::NO_ACTION_NEEDED:
    129       return "no action";
    130     case Urgency::UPDATE_IMMEDIATELY:
    131       return "update immediately";
    132     case Urgency::QUEUE_UPDATE:
    133       return "queue update";
    134     default:
    135       GPR_UNREACHABLE_CODE(return "unknown");
    136   }
    137   GPR_UNREACHABLE_CODE(return "unknown");
    138 }
    139 
    140 void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
    141   char* iw_str = fmt_uint32_diff_str(
    142       t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
    143       initial_window_size_);
    144   char* mf_str = fmt_uint32_diff_str(
    145       t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
    146       max_frame_size_);
    147   gpr_log(GPR_DEBUG, "t[%s],  s[%s], iw:%s:%s mf:%s:%s",
    148           UrgencyString(send_transport_update_),
    149           UrgencyString(send_stream_update_),
    150           UrgencyString(send_initial_window_update_), iw_str,
    151           UrgencyString(send_max_frame_size_update_), mf_str);
    152   gpr_free(iw_str);
    153   gpr_free(mf_str);
    154 }
    155 
    156 TransportFlowControlDisabled::TransportFlowControlDisabled(
    157     grpc_chttp2_transport* t) {
    158   remote_window_ = kMaxWindow;
    159   target_initial_window_size_ = kMaxWindow;
    160   announced_window_ = kMaxWindow;
    161   t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
    162       kFrameSize;
    163   t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
    164       kFrameSize;
    165   t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
    166       kFrameSize;
    167   t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
    168       kMaxWindow;
    169   t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
    170       kMaxWindow;
    171   t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
    172       kMaxWindow;
    173 }
    174 
    175 TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t,
    176                                            bool enable_bdp_probe)
    177     : t_(t),
    178       enable_bdp_probe_(enable_bdp_probe),
    179       bdp_estimator_(t->peer_string),
    180       pid_controller_(grpc_core::PidController::Args()
    181                           .set_gain_p(4)
    182                           .set_gain_i(8)
    183                           .set_gain_d(0)
    184                           .set_initial_control_value(TargetLogBdp())
    185                           .set_min_control_value(-1)
    186                           .set_max_control_value(25)
    187                           .set_integral_range(10)),
    188       last_pid_update_(grpc_core::ExecCtx::Get()->Now()) {}
    189 
    190 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
    191   FlowControlTrace trace("t updt sent", this, nullptr);
    192   const uint32_t target_announced_window =
    193       static_cast<const uint32_t>(target_window());
    194   if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
    195       announced_window_ != target_announced_window) {
    196     const uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
    197         target_announced_window - announced_window_, 0, kMaxWindowUpdateSize);
    198     announced_window_ += announce;
    199     return announce;
    200   }
    201   return 0;
    202 }
    203 
    204 grpc_error* TransportFlowControl::ValidateRecvData(
    205     int64_t incoming_frame_size) {
    206   if (incoming_frame_size > announced_window_) {
    207     char* msg;
    208     gpr_asprintf(&msg,
    209                  "frame of size %" PRId64 " overflows local window of %" PRId64,
    210                  incoming_frame_size, announced_window_);
    211     grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
    212     gpr_free(msg);
    213     return err;
    214   }
    215   return GRPC_ERROR_NONE;
    216 }
    217 
    218 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
    219                                      const grpc_chttp2_stream* s)
    220     : tfc_(tfc), s_(s) {}
    221 
    222 grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
    223   FlowControlTrace trace("  data recv", tfc_, this);
    224 
    225   grpc_error* error = GRPC_ERROR_NONE;
    226   error = tfc_->ValidateRecvData(incoming_frame_size);
    227   if (error != GRPC_ERROR_NONE) return error;
    228 
    229   uint32_t sent_init_window =
    230       tfc_->transport()->settings[GRPC_SENT_SETTINGS]
    231                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
    232   uint32_t acked_init_window =
    233       tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
    234                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
    235 
    236   int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
    237   int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
    238   if (incoming_frame_size > acked_stream_window) {
    239     if (incoming_frame_size <= sent_stream_window) {
    240       gpr_log(GPR_ERROR,
    241               "Incoming frame of size %" PRId64
    242               " exceeds local window size of %" PRId64
    243               ".\n"
    244               "The (un-acked, future) window size would be %" PRId64
    245               " which is not exceeded.\n"
    246               "This would usually cause a disconnection, but allowing it due to"
    247               "broken HTTP2 implementations in the wild.\n"
    248               "See (for example) https://github.com/netty/netty/issues/6520.",
    249               incoming_frame_size, acked_stream_window, sent_stream_window);
    250     } else {
    251       char* msg;
    252       gpr_asprintf(
    253           &msg, "frame of size %" PRId64 " overflows local window of %" PRId64,
    254           incoming_frame_size, acked_stream_window);
    255       grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
    256       gpr_free(msg);
    257       return err;
    258     }
    259   }
    260 
    261   UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
    262   local_window_delta_ -= incoming_frame_size;
    263   tfc_->CommitRecvData(incoming_frame_size);
    264   return GRPC_ERROR_NONE;
    265 }
    266 
    267 uint32_t StreamFlowControl::MaybeSendUpdate() {
    268   FlowControlTrace trace("s updt sent", tfc_, this);
    269   if (local_window_delta_ > announced_window_delta_) {
    270     uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
    271         local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize);
    272     UpdateAnnouncedWindowDelta(tfc_, announce);
    273     return announce;
    274   }
    275   return 0;
    276 }
    277 
    278 void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
    279                                                  size_t have_already) {
    280   FlowControlTrace trace("app st recv", tfc_, this);
    281   uint32_t max_recv_bytes;
    282   uint32_t sent_init_window =
    283       tfc_->transport()->settings[GRPC_SENT_SETTINGS]
    284                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
    285 
    286   /* clamp max recv hint to an allowable size */
    287   if (max_size_hint >= UINT32_MAX - sent_init_window) {
    288     max_recv_bytes = UINT32_MAX - sent_init_window;
    289   } else {
    290     max_recv_bytes = static_cast<uint32_t>(max_size_hint);
    291   }
    292 
    293   /* account for bytes already received but unknown to higher layers */
    294   if (max_recv_bytes >= have_already) {
    295     max_recv_bytes -= static_cast<uint32_t>(have_already);
    296   } else {
    297     max_recv_bytes = 0;
    298   }
    299 
    300   /* add some small lookahead to keep pipelines flowing */
    301   GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
    302   if (local_window_delta_ < max_recv_bytes) {
    303     uint32_t add_max_recv_bytes =
    304         static_cast<uint32_t>(max_recv_bytes - local_window_delta_);
    305     local_window_delta_ += add_max_recv_bytes;
    306   }
    307 }
    308 
    309 // Take in a target and modifies it based on the memory pressure of the system
    310 static double AdjustForMemoryPressure(grpc_resource_quota* quota,
    311                                       double target) {
    312   // do not increase window under heavy memory pressure.
    313   double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
    314   static const double kLowMemPressure = 0.1;
    315   static const double kZeroTarget = 22;
    316   static const double kHighMemPressure = 0.8;
    317   static const double kMaxMemPressure = 0.9;
    318   if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
    319     target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
    320              kZeroTarget;
    321   } else if (memory_pressure > kHighMemPressure) {
    322     target *= 1 - GPR_MIN(1, (memory_pressure - kHighMemPressure) /
    323                                  (kMaxMemPressure - kHighMemPressure));
    324   }
    325   return target;
    326 }
    327 
    328 double TransportFlowControl::TargetLogBdp() {
    329   return AdjustForMemoryPressure(
    330       grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
    331       1 + log2(bdp_estimator_.EstimateBdp()));
    332 }
    333 
    334 double TransportFlowControl::SmoothLogBdp(double value) {
    335   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
    336   double bdp_error = value - pid_controller_.last_control_value();
    337   const double dt = static_cast<double>(now - last_pid_update_) * 1e-3;
    338   last_pid_update_ = now;
    339   // Limit dt to 100ms
    340   const double kMaxDt = 0.1;
    341   return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
    342 }
    343 
    344 FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
    345     int64_t value, grpc_chttp2_setting_id setting_id) {
    346   int64_t delta = value - static_cast<int64_t>(
    347                               t_->settings[GRPC_LOCAL_SETTINGS][setting_id]);
    348   // TODO(ncteisen): tune this
    349   if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
    350     return FlowControlAction::Urgency::QUEUE_UPDATE;
    351   } else {
    352     return FlowControlAction::Urgency::NO_ACTION_NEEDED;
    353   }
    354 }
    355 
    356 FlowControlAction TransportFlowControl::PeriodicUpdate() {
    357   FlowControlAction action;
    358   if (enable_bdp_probe_) {
    359     // get bdp estimate and update initial_window accordingly.
    360     // target might change based on how much memory pressure we are under
    361     // TODO(ncteisen): experiment with setting target to be huge under low
    362     // memory pressure.
    363     const double target = pow(2, SmoothLogBdp(TargetLogBdp()));
    364 
    365     // Though initial window 'could' drop to 0, we keep the floor at 128
    366     target_initial_window_size_ =
    367         static_cast<int32_t> GPR_CLAMP(target, 128, INT32_MAX);
    368 
    369     action.set_send_initial_window_update(
    370         DeltaUrgency(target_initial_window_size_,
    371                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
    372         static_cast<uint32_t>(target_initial_window_size_));
    373 
    374     // get bandwidth estimate and update max_frame accordingly.
    375     double bw_dbl = bdp_estimator_.EstimateBandwidth();
    376     // we target the max of BDP or bandwidth in microseconds.
    377     int32_t frame_size = static_cast<int32_t> GPR_CLAMP(
    378         GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
    379                 target_initial_window_size_),
    380         16384, 16777215);
    381     action.set_send_max_frame_size_update(
    382         DeltaUrgency(static_cast<int64_t>(frame_size),
    383                      GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
    384         frame_size);
    385   }
    386   return UpdateAction(action);
    387 }
    388 
    389 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
    390   // TODO(ncteisen): tune this
    391   if (!s_->read_closed) {
    392     uint32_t sent_init_window =
    393         tfc_->transport()->settings[GRPC_SENT_SETTINGS]
    394                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
    395     if (local_window_delta_ > announced_window_delta_ &&
    396         announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
    397       action.set_send_stream_update(
    398           FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
    399     } else if (local_window_delta_ > announced_window_delta_) {
    400       action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
    401     }
    402   }
    403 
    404   return action;
    405 }
    406 
    407 }  // namespace chttp2
    408 }  // namespace grpc_core
    409