1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 /* Benchmark gRPC end2end in various configurations */ 20 21 #include <benchmark/benchmark.h> 22 #include <gflags/gflags.h> 23 #include <fstream> 24 25 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 26 #include "src/core/ext/transport/chttp2/transport/internal.h" 27 #include "src/core/lib/iomgr/timer_manager.h" 28 #include "src/core/lib/profiling/timers.h" 29 #include "src/proto/grpc/testing/echo.grpc.pb.h" 30 #include "test/core/util/trickle_endpoint.h" 31 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" 32 #include "test/cpp/microbenchmarks/fullstack_fixtures.h" 33 #include "test/cpp/util/test_config.h" 34 35 DEFINE_bool(log, false, "Log state to CSV files"); 36 DEFINE_int32( 37 warmup_megabytes, 1, 38 "Number of megabytes to pump before collecting flow control stats"); 39 DEFINE_int32( 40 warmup_iterations, 100, 41 "Number of iterations to run before collecting flow control stats"); 42 DEFINE_int32(warmup_max_time_seconds, 10, 43 "Maximum number of seconds to run warmup loop"); 44 45 namespace grpc { 46 namespace testing { 47 48 gpr_atm g_now_us = 0; 49 50 static gpr_timespec fake_now(gpr_clock_type clock_type) { 51 gpr_timespec t; 52 gpr_atm now = gpr_atm_no_barrier_load(&g_now_us); 53 t.tv_sec = now / GPR_US_PER_SEC; 54 t.tv_nsec = (now % GPR_US_PER_SEC) * GPR_NS_PER_US; 55 t.clock_type = clock_type; 56 return t; 57 } 58 59 static void inc_time() { 60 gpr_atm_no_barrier_fetch_add(&g_now_us, 100); 61 grpc_timer_manager_tick(); 62 } 63 64 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } 65 66 template <class A0> 67 static void write_csv(std::ostream* out, A0&& a0) { 68 if (!out) return; 69 (*out) << a0 << "\n"; 70 } 71 72 template <class A0, class... Arg> 73 static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) { 74 if (!out) return; 75 (*out) << a0 << ","; 76 write_csv(out, std::forward<Arg>(arg)...); 77 } 78 79 class TrickledCHTTP2 : public EndpointPairFixture { 80 public: 81 TrickledCHTTP2(Service* service, bool streaming, size_t req_size, 82 size_t resp_size, size_t kilobits_per_second, 83 grpc_passthru_endpoint_stats* stats) 84 : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second, stats), 85 FixtureConfiguration()), 86 stats_(stats) { 87 if (FLAGS_log) { 88 std::ostringstream fn; 89 fn << "trickle." << (streaming ? "streaming" : "unary") << "." << req_size 90 << "." << resp_size << "." << kilobits_per_second << ".csv"; 91 log_.reset(new std::ofstream(fn.str().c_str())); 92 write_csv(log_.get(), "t", "iteration", "client_backlog", 93 "server_backlog", "client_t_stall", "client_s_stall", 94 "server_t_stall", "server_s_stall", "client_t_remote", 95 "server_t_remote", "client_t_announced", "server_t_announced", 96 "client_s_remote_delta", "server_s_remote_delta", 97 "client_s_local_delta", "server_s_local_delta", 98 "client_s_announced_delta", "server_s_announced_delta", 99 "client_peer_iws", "client_local_iws", "client_sent_iws", 100 "client_acked_iws", "server_peer_iws", "server_local_iws", 101 "server_sent_iws", "server_acked_iws", "client_queued_bytes", 102 "server_queued_bytes"); 103 } 104 } 105 106 virtual ~TrickledCHTTP2() { 107 if (stats_ != nullptr) { 108 grpc_passthru_endpoint_stats_destroy(stats_); 109 } 110 } 111 112 void AddToLabel(std::ostream& out, benchmark::State& state) { 113 out << " writes/iter:" 114 << ((double)stats_->num_writes / (double)state.iterations()) 115 << " cli_transport_stalls/iter:" 116 << ((double) 117 client_stats_.streams_stalled_due_to_transport_flow_control / 118 (double)state.iterations()) 119 << " cli_stream_stalls/iter:" 120 << ((double)client_stats_.streams_stalled_due_to_stream_flow_control / 121 (double)state.iterations()) 122 << " svr_transport_stalls/iter:" 123 << ((double) 124 server_stats_.streams_stalled_due_to_transport_flow_control / 125 (double)state.iterations()) 126 << " svr_stream_stalls/iter:" 127 << ((double)server_stats_.streams_stalled_due_to_stream_flow_control / 128 (double)state.iterations()); 129 } 130 131 void Log(int64_t iteration) GPR_ATTRIBUTE_NO_TSAN { 132 auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_); 133 grpc_chttp2_transport* client = 134 reinterpret_cast<grpc_chttp2_transport*>(client_transport_); 135 grpc_chttp2_transport* server = 136 reinterpret_cast<grpc_chttp2_transport*>(server_transport_); 137 grpc_chttp2_stream* client_stream = 138 client->stream_map.count == 1 139 ? static_cast<grpc_chttp2_stream*>(client->stream_map.values[0]) 140 : nullptr; 141 grpc_chttp2_stream* server_stream = 142 server->stream_map.count == 1 143 ? static_cast<grpc_chttp2_stream*>(server->stream_map.values[0]) 144 : nullptr; 145 write_csv( 146 log_.get(), 147 static_cast<double>(now.tv_sec) + 148 1e-9 * static_cast<double>(now.tv_nsec), 149 iteration, grpc_trickle_get_backlog(endpoint_pair_.client), 150 grpc_trickle_get_backlog(endpoint_pair_.server), 151 client->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, 152 client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, 153 server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, 154 server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, 155 client->flow_control->remote_window_, 156 server->flow_control->remote_window_, 157 client->flow_control->announced_window_, 158 server->flow_control->announced_window_, 159 client_stream ? client_stream->flow_control->remote_window_delta_ : -1, 160 server_stream ? server_stream->flow_control->remote_window_delta_ : -1, 161 client_stream ? client_stream->flow_control->local_window_delta_ : -1, 162 server_stream ? server_stream->flow_control->local_window_delta_ : -1, 163 client_stream ? client_stream->flow_control->announced_window_delta_ 164 : -1, 165 server_stream ? server_stream->flow_control->announced_window_delta_ 166 : -1, 167 client->settings[GRPC_PEER_SETTINGS] 168 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 169 client->settings[GRPC_LOCAL_SETTINGS] 170 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 171 client->settings[GRPC_SENT_SETTINGS] 172 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 173 client->settings[GRPC_ACKED_SETTINGS] 174 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 175 server->settings[GRPC_PEER_SETTINGS] 176 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 177 server->settings[GRPC_LOCAL_SETTINGS] 178 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 179 server->settings[GRPC_SENT_SETTINGS] 180 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 181 server->settings[GRPC_ACKED_SETTINGS] 182 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], 183 client_stream ? client_stream->flow_controlled_buffer.length : 0, 184 server_stream ? server_stream->flow_controlled_buffer.length : 0); 185 } 186 187 void Step(bool update_stats) { 188 grpc_core::ExecCtx exec_ctx; 189 inc_time(); 190 size_t client_backlog = 191 grpc_trickle_endpoint_trickle(endpoint_pair_.client); 192 size_t server_backlog = 193 grpc_trickle_endpoint_trickle(endpoint_pair_.server); 194 195 if (update_stats) { 196 UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_, 197 client_backlog); 198 UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_, 199 server_backlog); 200 } 201 } 202 203 private: 204 grpc_passthru_endpoint_stats* stats_; 205 struct Stats { 206 int streams_stalled_due_to_stream_flow_control = 0; 207 int streams_stalled_due_to_transport_flow_control = 0; 208 }; 209 Stats client_stats_; 210 Stats server_stats_; 211 std::unique_ptr<std::ofstream> log_; 212 gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC); 213 214 static grpc_endpoint_pair MakeEndpoints(size_t kilobits, 215 grpc_passthru_endpoint_stats* stats) { 216 grpc_endpoint_pair p; 217 grpc_passthru_endpoint_create(&p.client, &p.server, Library::get().rq(), 218 stats); 219 double bytes_per_second = 125.0 * kilobits; 220 p.client = grpc_trickle_endpoint_create(p.client, bytes_per_second); 221 p.server = grpc_trickle_endpoint_create(p.server, bytes_per_second); 222 return p; 223 } 224 225 void UpdateStats(grpc_chttp2_transport* t, Stats* s, 226 size_t backlog) GPR_ATTRIBUTE_NO_TSAN { 227 if (backlog == 0) { 228 if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr) { 229 s->streams_stalled_due_to_stream_flow_control++; 230 } 231 if (t->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr) { 232 s->streams_stalled_due_to_transport_flow_control++; 233 } 234 } 235 } 236 }; 237 238 // force library initialization 239 auto& force_library_initialization = Library::get(); 240 241 static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok, 242 int64_t iteration) { 243 while (true) { 244 fixture->Log(iteration); 245 switch ( 246 fixture->cq()->AsyncNext(t, ok, gpr_inf_past(GPR_CLOCK_MONOTONIC))) { 247 case CompletionQueue::TIMEOUT: 248 fixture->Step(iteration != -1); 249 break; 250 case CompletionQueue::SHUTDOWN: 251 GPR_ASSERT(false); 252 break; 253 case CompletionQueue::GOT_EVENT: 254 return; 255 } 256 } 257 } 258 259 static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) { 260 EchoTestService::AsyncService service; 261 std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( 262 &service, true, state.range(0) /* req_size */, 263 state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */, 264 grpc_passthru_endpoint_stats_create())); 265 { 266 EchoResponse send_response; 267 EchoResponse recv_response; 268 if (state.range(0) > 0) { 269 send_response.set_message(std::string(state.range(0), 'a')); 270 } 271 Status recv_status; 272 ServerContext svr_ctx; 273 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); 274 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), 275 fixture->cq(), tag(0)); 276 std::unique_ptr<EchoTestService::Stub> stub( 277 EchoTestService::NewStub(fixture->channel())); 278 ClientContext cli_ctx; 279 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); 280 int need_tags = (1 << 0) | (1 << 1); 281 void* t; 282 bool ok; 283 while (need_tags) { 284 TrickleCQNext(fixture.get(), &t, &ok, -1); 285 GPR_ASSERT(ok); 286 int i = (int)(intptr_t)t; 287 GPR_ASSERT(need_tags & (1 << i)); 288 need_tags &= ~(1 << i); 289 } 290 request_rw->Read(&recv_response, tag(0)); 291 auto inner_loop = [&](bool in_warmup) { 292 GPR_TIMER_SCOPE("BenchmarkCycle", 0); 293 response_rw.Write(send_response, tag(1)); 294 while (true) { 295 TrickleCQNext(fixture.get(), &t, &ok, 296 in_warmup ? -1 : state.iterations()); 297 if (t == tag(0)) { 298 request_rw->Read(&recv_response, tag(0)); 299 } else if (t == tag(1)) { 300 break; 301 } else { 302 GPR_ASSERT(false); 303 } 304 } 305 }; 306 gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC); 307 for (int i = 0; 308 i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 * 309 1024 / (14 + state.range(0))); 310 i++) { 311 inner_loop(true); 312 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start), 313 gpr_time_from_seconds(FLAGS_warmup_max_time_seconds, 314 GPR_TIMESPAN)) > 0) { 315 break; 316 } 317 } 318 while (state.KeepRunning()) { 319 inner_loop(false); 320 } 321 response_rw.Finish(Status::OK, tag(1)); 322 grpc::Status status; 323 request_rw->Finish(&status, tag(2)); 324 need_tags = (1 << 0) | (1 << 1) | (1 << 2); 325 while (need_tags) { 326 TrickleCQNext(fixture.get(), &t, &ok, -1); 327 if (t == tag(0) && ok) { 328 request_rw->Read(&recv_response, tag(0)); 329 continue; 330 } 331 int i = (int)(intptr_t)t; 332 GPR_ASSERT(need_tags & (1 << i)); 333 need_tags &= ~(1 << i); 334 } 335 } 336 fixture->Finish(state); 337 fixture.reset(); 338 state.SetBytesProcessed(state.range(0) * state.iterations()); 339 } 340 341 static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) { 342 for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) { 343 for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) { 344 double expected_time = 345 static_cast<double>(14 + i) / (125.0 * static_cast<double>(j)); 346 if (expected_time > 2.0) continue; 347 b->Args({i, j}); 348 } 349 } 350 } 351 BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(StreamingTrickleArgs); 352 353 static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { 354 EchoTestService::AsyncService service; 355 std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( 356 &service, false, state.range(0) /* req_size */, 357 state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */, 358 grpc_passthru_endpoint_stats_create())); 359 EchoRequest send_request; 360 EchoResponse send_response; 361 EchoResponse recv_response; 362 if (state.range(0) > 0) { 363 send_request.set_message(std::string(state.range(0), 'a')); 364 } 365 if (state.range(1) > 0) { 366 send_response.set_message(std::string(state.range(1), 'a')); 367 } 368 Status recv_status; 369 struct ServerEnv { 370 ServerContext ctx; 371 EchoRequest recv_request; 372 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; 373 ServerEnv() : response_writer(&ctx) {} 374 }; 375 uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; 376 ServerEnv* server_env[2] = { 377 reinterpret_cast<ServerEnv*>(server_env_buffer), 378 reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; 379 new (server_env[0]) ServerEnv; 380 new (server_env[1]) ServerEnv; 381 service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, 382 &server_env[0]->response_writer, fixture->cq(), 383 fixture->cq(), tag(0)); 384 service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, 385 &server_env[1]->response_writer, fixture->cq(), 386 fixture->cq(), tag(1)); 387 std::unique_ptr<EchoTestService::Stub> stub( 388 EchoTestService::NewStub(fixture->channel())); 389 auto inner_loop = [&](bool in_warmup) { 390 GPR_TIMER_SCOPE("BenchmarkCycle", 0); 391 recv_response.Clear(); 392 ClientContext cli_ctx; 393 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( 394 stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); 395 void* t; 396 bool ok; 397 response_reader->Finish(&recv_response, &recv_status, tag(4)); 398 TrickleCQNext(fixture.get(), &t, &ok, in_warmup ? -1 : state.iterations()); 399 GPR_ASSERT(ok); 400 GPR_ASSERT(t == tag(0) || t == tag(1)); 401 intptr_t slot = reinterpret_cast<intptr_t>(t); 402 ServerEnv* senv = server_env[slot]; 403 senv->response_writer.Finish(send_response, Status::OK, tag(3)); 404 for (int i = (1 << 3) | (1 << 4); i != 0;) { 405 TrickleCQNext(fixture.get(), &t, &ok, 406 in_warmup ? -1 : state.iterations()); 407 GPR_ASSERT(ok); 408 int tagnum = (int)reinterpret_cast<intptr_t>(t); 409 GPR_ASSERT(i & (1 << tagnum)); 410 i -= 1 << tagnum; 411 } 412 GPR_ASSERT(recv_status.ok()); 413 414 senv->~ServerEnv(); 415 senv = new (senv) ServerEnv(); 416 service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, 417 fixture->cq(), fixture->cq(), tag(slot)); 418 }; 419 gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC); 420 for (int i = 0; 421 i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 * 422 1024 / (14 + state.range(0))); 423 i++) { 424 inner_loop(true); 425 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start), 426 gpr_time_from_seconds(FLAGS_warmup_max_time_seconds, 427 GPR_TIMESPAN)) > 0) { 428 break; 429 } 430 } 431 while (state.KeepRunning()) { 432 inner_loop(false); 433 } 434 fixture->Finish(state); 435 fixture.reset(); 436 server_env[0]->~ServerEnv(); 437 server_env[1]->~ServerEnv(); 438 state.SetBytesProcessed(state.range(0) * state.iterations() + 439 state.range(1) * state.iterations()); 440 } 441 442 static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) { 443 for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) { 444 b->Args({1, 1, bw}); 445 for (int i = 64; i <= 128 * 1024 * 1024; i *= 64) { 446 double expected_time = 447 static_cast<double>(14 + i) / (125.0 * static_cast<double>(bw)); 448 if (expected_time > 2.0) continue; 449 b->Args({i, 1, bw}); 450 b->Args({1, i, bw}); 451 b->Args({i, i, bw}); 452 } 453 } 454 } 455 BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs); 456 } // namespace testing 457 } // namespace grpc 458 459 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); 460 461 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace, 462 // and others do not. This allows us to support both modes. 463 namespace benchmark { 464 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } 465 } // namespace benchmark 466 467 int main(int argc, char** argv) { 468 ::benchmark::Initialize(&argc, argv); 469 ::grpc::testing::InitTest(&argc, &argv, false); 470 grpc_timer_manager_set_threading(false); 471 gpr_now_impl = ::grpc::testing::fake_now; 472 benchmark::RunTheBenchmarksNamespaced(); 473 } 474