1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 /* Microbenchmarks around CHTTP2 transport operations */ 20 21 #include <benchmark/benchmark.h> 22 #include <grpc/support/alloc.h> 23 #include <grpc/support/log.h> 24 #include <grpc/support/string_util.h> 25 #include <grpcpp/support/channel_arguments.h> 26 #include <string.h> 27 #include <memory> 28 #include <queue> 29 #include <sstream> 30 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 31 #include "src/core/ext/transport/chttp2/transport/internal.h" 32 #include "src/core/lib/iomgr/closure.h" 33 #include "src/core/lib/iomgr/resource_quota.h" 34 #include "src/core/lib/slice/slice_internal.h" 35 #include "src/core/lib/transport/static_metadata.h" 36 #include "test/cpp/microbenchmarks/helpers.h" 37 #include "test/cpp/util/test_config.h" 38 39 auto& force_library_initialization = Library::get(); 40 41 //////////////////////////////////////////////////////////////////////////////// 42 // Helper classes 43 // 44 45 class DummyEndpoint : public grpc_endpoint { 46 public: 47 DummyEndpoint() { 48 static const grpc_endpoint_vtable my_vtable = {read, 49 write, 50 add_to_pollset, 51 add_to_pollset_set, 52 delete_from_pollset_set, 53 shutdown, 54 destroy, 55 get_resource_user, 56 get_peer, 57 get_fd}; 58 grpc_endpoint::vtable = &my_vtable; 59 ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); 60 } 61 62 void PushInput(grpc_slice slice) { 63 if (read_cb_ == nullptr) { 64 GPR_ASSERT(!have_slice_); 65 buffered_slice_ = slice; 66 have_slice_ = true; 67 return; 68 } 69 grpc_slice_buffer_add(slices_, slice); 70 GRPC_CLOSURE_SCHED(read_cb_, GRPC_ERROR_NONE); 71 read_cb_ = nullptr; 72 } 73 74 private: 75 grpc_resource_user* ru_; 76 grpc_closure* read_cb_ = nullptr; 77 grpc_slice_buffer* slices_ = nullptr; 78 bool have_slice_ = false; 79 grpc_slice buffered_slice_; 80 81 void QueueRead(grpc_slice_buffer* slices, grpc_closure* cb) { 82 GPR_ASSERT(read_cb_ == nullptr); 83 if (have_slice_) { 84 have_slice_ = false; 85 grpc_slice_buffer_add(slices, buffered_slice_); 86 GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); 87 return; 88 } 89 read_cb_ = cb; 90 slices_ = slices; 91 } 92 93 static void read(grpc_endpoint* ep, grpc_slice_buffer* slices, 94 grpc_closure* cb) { 95 static_cast<DummyEndpoint*>(ep)->QueueRead(slices, cb); 96 } 97 98 static void write(grpc_endpoint* ep, grpc_slice_buffer* slices, 99 grpc_closure* cb, void* arg) { 100 GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); 101 } 102 103 static grpc_workqueue* get_workqueue(grpc_endpoint* ep) { return nullptr; } 104 105 static void add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {} 106 107 static void add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) { 108 } 109 110 static void delete_from_pollset_set(grpc_endpoint* ep, 111 grpc_pollset_set* pollset) {} 112 113 static void shutdown(grpc_endpoint* ep, grpc_error* why) { 114 grpc_resource_user_shutdown(static_cast<DummyEndpoint*>(ep)->ru_); 115 GRPC_CLOSURE_SCHED(static_cast<DummyEndpoint*>(ep)->read_cb_, why); 116 } 117 118 static void destroy(grpc_endpoint* ep) { 119 grpc_resource_user_unref(static_cast<DummyEndpoint*>(ep)->ru_); 120 delete static_cast<DummyEndpoint*>(ep); 121 } 122 123 static grpc_resource_user* get_resource_user(grpc_endpoint* ep) { 124 return static_cast<DummyEndpoint*>(ep)->ru_; 125 } 126 static char* get_peer(grpc_endpoint* ep) { return gpr_strdup("test"); } 127 static int get_fd(grpc_endpoint* ep) { return 0; } 128 }; 129 130 class Fixture { 131 public: 132 Fixture(const grpc::ChannelArguments& args, bool client) { 133 grpc_channel_args c_args = args.c_channel_args(); 134 ep_ = new DummyEndpoint; 135 t_ = grpc_create_chttp2_transport(&c_args, ep_, client); 136 grpc_chttp2_transport_start_reading(t_, nullptr, nullptr); 137 FlushExecCtx(); 138 } 139 140 void FlushExecCtx() { grpc_core::ExecCtx::Get()->Flush(); } 141 142 ~Fixture() { grpc_transport_destroy(t_); } 143 144 grpc_chttp2_transport* chttp2_transport() { 145 return reinterpret_cast<grpc_chttp2_transport*>(t_); 146 } 147 grpc_transport* transport() { return t_; } 148 149 void PushInput(grpc_slice slice) { ep_->PushInput(slice); } 150 151 private: 152 DummyEndpoint* ep_; 153 grpc_transport* t_; 154 }; 155 156 class Closure : public grpc_closure { 157 public: 158 virtual ~Closure() {} 159 }; 160 161 template <class F> 162 std::unique_ptr<Closure> MakeClosure( 163 F f, grpc_closure_scheduler* sched = grpc_schedule_on_exec_ctx) { 164 struct C : public Closure { 165 C(const F& f, grpc_closure_scheduler* sched) : f_(f) { 166 GRPC_CLOSURE_INIT(this, Execute, this, sched); 167 } 168 F f_; 169 static void Execute(void* arg, grpc_error* error) { 170 static_cast<C*>(arg)->f_(error); 171 } 172 }; 173 return std::unique_ptr<Closure>(new C(f, sched)); 174 } 175 176 template <class F> 177 grpc_closure* MakeOnceClosure( 178 F f, grpc_closure_scheduler* sched = grpc_schedule_on_exec_ctx) { 179 struct C : public grpc_closure { 180 C(const F& f) : f_(f) {} 181 F f_; 182 static void Execute(void* arg, grpc_error* error) { 183 static_cast<C*>(arg)->f_(error); 184 delete static_cast<C*>(arg); 185 } 186 }; 187 auto* c = new C{f}; 188 return GRPC_CLOSURE_INIT(c, C::Execute, c, sched); 189 } 190 191 class Stream { 192 public: 193 Stream(Fixture* f) : f_(f) { 194 stream_size_ = grpc_transport_stream_size(f->transport()); 195 stream_ = gpr_malloc(stream_size_); 196 arena_ = gpr_arena_create(4096); 197 } 198 199 ~Stream() { 200 gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME)); 201 gpr_free(stream_); 202 gpr_arena_destroy(arena_); 203 } 204 205 void Init(benchmark::State& state) { 206 GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this, 207 "test_stream"); 208 gpr_event_init(&done_); 209 memset(stream_, 0, stream_size_); 210 if ((state.iterations() & 0xffff) == 0) { 211 gpr_arena_destroy(arena_); 212 arena_ = gpr_arena_create(4096); 213 } 214 grpc_transport_init_stream(f_->transport(), 215 static_cast<grpc_stream*>(stream_), &refcount_, 216 nullptr, arena_); 217 } 218 219 void DestroyThen(grpc_closure* closure) { 220 destroy_closure_ = closure; 221 #ifndef NDEBUG 222 grpc_stream_unref(&refcount_, "DestroyThen"); 223 #else 224 grpc_stream_unref(&refcount_); 225 #endif 226 } 227 228 void Op(grpc_transport_stream_op_batch* op) { 229 grpc_transport_perform_stream_op(f_->transport(), 230 static_cast<grpc_stream*>(stream_), op); 231 } 232 233 grpc_chttp2_stream* chttp2_stream() { 234 return static_cast<grpc_chttp2_stream*>(stream_); 235 } 236 237 private: 238 static void FinishDestroy(void* arg, grpc_error* error) { 239 auto stream = static_cast<Stream*>(arg); 240 grpc_transport_destroy_stream(stream->f_->transport(), 241 static_cast<grpc_stream*>(stream->stream_), 242 stream->destroy_closure_); 243 gpr_event_set(&stream->done_, (void*)1); 244 } 245 246 Fixture* f_; 247 grpc_stream_refcount refcount_; 248 gpr_arena* arena_; 249 size_t stream_size_; 250 void* stream_; 251 grpc_closure* destroy_closure_ = nullptr; 252 gpr_event done_; 253 }; 254 255 //////////////////////////////////////////////////////////////////////////////// 256 // Benchmarks 257 // 258 259 static void BM_StreamCreateDestroy(benchmark::State& state) { 260 TrackCounters track_counters; 261 grpc_core::ExecCtx exec_ctx; 262 Fixture f(grpc::ChannelArguments(), true); 263 Stream s(&f); 264 grpc_transport_stream_op_batch op; 265 grpc_transport_stream_op_batch_payload op_payload; 266 memset(&op, 0, sizeof(op)); 267 op.cancel_stream = true; 268 op.payload = &op_payload; 269 op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; 270 std::unique_ptr<Closure> next = MakeClosure([&](grpc_error* error) { 271 if (!state.KeepRunning()) return; 272 s.Init(state); 273 s.Op(&op); 274 s.DestroyThen(next.get()); 275 }); 276 GRPC_CLOSURE_RUN(next.get(), GRPC_ERROR_NONE); 277 f.FlushExecCtx(); 278 track_counters.Finish(state); 279 } 280 BENCHMARK(BM_StreamCreateDestroy); 281 282 class RepresentativeClientInitialMetadata { 283 public: 284 static std::vector<grpc_mdelem> GetElems() { 285 return { 286 GRPC_MDELEM_SCHEME_HTTP, 287 GRPC_MDELEM_METHOD_POST, 288 grpc_mdelem_from_slices( 289 GRPC_MDSTR_PATH, 290 grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))), 291 grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY, 292 grpc_slice_intern(grpc_slice_from_static_string( 293 "foo.test.google.fr:1234"))), 294 GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP, 295 GRPC_MDELEM_TE_TRAILERS, 296 GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC, 297 grpc_mdelem_from_slices( 298 GRPC_MDSTR_USER_AGENT, 299 grpc_slice_intern(grpc_slice_from_static_string( 300 "grpc-c/3.0.0-dev (linux; chttp2; green)")))}; 301 } 302 }; 303 304 template <class Metadata> 305 static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) { 306 TrackCounters track_counters; 307 grpc_core::ExecCtx exec_ctx; 308 Fixture f(grpc::ChannelArguments(), true); 309 Stream s(&f); 310 grpc_transport_stream_op_batch op; 311 grpc_transport_stream_op_batch_payload op_payload; 312 memset(&op_payload, 0, sizeof(op_payload)); 313 std::unique_ptr<Closure> start; 314 std::unique_ptr<Closure> done; 315 316 auto reset_op = [&]() { 317 memset(&op, 0, sizeof(op)); 318 op.payload = &op_payload; 319 }; 320 321 grpc_metadata_batch b; 322 grpc_metadata_batch_init(&b); 323 b.deadline = GRPC_MILLIS_INF_FUTURE; 324 std::vector<grpc_mdelem> elems = Metadata::GetElems(); 325 std::vector<grpc_linked_mdelem> storage(elems.size()); 326 for (size_t i = 0; i < elems.size(); i++) { 327 GPR_ASSERT(GRPC_LOG_IF_ERROR( 328 "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); 329 } 330 331 f.FlushExecCtx(); 332 start = MakeClosure([&](grpc_error* error) { 333 if (!state.KeepRunning()) return; 334 s.Init(state); 335 reset_op(); 336 op.on_complete = done.get(); 337 op.send_initial_metadata = true; 338 op.payload->send_initial_metadata.send_initial_metadata = &b; 339 s.Op(&op); 340 }); 341 done = MakeClosure([&](grpc_error* error) { 342 reset_op(); 343 op.cancel_stream = true; 344 op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; 345 s.Op(&op); 346 s.DestroyThen(start.get()); 347 }); 348 GRPC_CLOSURE_SCHED(start.get(), GRPC_ERROR_NONE); 349 f.FlushExecCtx(); 350 grpc_metadata_batch_destroy(&b); 351 track_counters.Finish(state); 352 } 353 BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy, 354 RepresentativeClientInitialMetadata); 355 356 static void BM_TransportEmptyOp(benchmark::State& state) { 357 TrackCounters track_counters; 358 grpc_core::ExecCtx exec_ctx; 359 Fixture f(grpc::ChannelArguments(), true); 360 Stream s(&f); 361 s.Init(state); 362 grpc_transport_stream_op_batch op; 363 grpc_transport_stream_op_batch_payload op_payload; 364 memset(&op_payload, 0, sizeof(op_payload)); 365 auto reset_op = [&]() { 366 memset(&op, 0, sizeof(op)); 367 op.payload = &op_payload; 368 }; 369 std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { 370 if (!state.KeepRunning()) return; 371 reset_op(); 372 op.on_complete = c.get(); 373 s.Op(&op); 374 }); 375 GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE); 376 f.FlushExecCtx(); 377 reset_op(); 378 op.cancel_stream = true; 379 op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; 380 s.Op(&op); 381 s.DestroyThen(MakeOnceClosure([](grpc_error* error) {})); 382 f.FlushExecCtx(); 383 track_counters.Finish(state); 384 } 385 BENCHMARK(BM_TransportEmptyOp); 386 387 std::vector<std::unique_ptr<gpr_event>> done_events; 388 389 static void BM_TransportStreamSend(benchmark::State& state) { 390 TrackCounters track_counters; 391 grpc_core::ExecCtx exec_ctx; 392 Fixture f(grpc::ChannelArguments(), true); 393 auto s = std::unique_ptr<Stream>(new Stream(&f)); 394 s->Init(state); 395 grpc_transport_stream_op_batch op; 396 grpc_transport_stream_op_batch_payload op_payload; 397 memset(&op_payload, 0, sizeof(op_payload)); 398 auto reset_op = [&]() { 399 memset(&op, 0, sizeof(op)); 400 op.payload = &op_payload; 401 }; 402 // Create the send_message payload slice. 403 // Note: We use grpc_slice_malloc_large() instead of grpc_slice_malloc() 404 // to force the slice to be refcounted, so that it remains alive when it 405 // is unreffed after each send_message op. 406 grpc_slice send_slice = grpc_slice_malloc_large(state.range(0)); 407 memset(GRPC_SLICE_START_PTR(send_slice), 0, GRPC_SLICE_LENGTH(send_slice)); 408 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> send_stream; 409 grpc_metadata_batch b; 410 grpc_metadata_batch_init(&b); 411 b.deadline = GRPC_MILLIS_INF_FUTURE; 412 std::vector<grpc_mdelem> elems = 413 RepresentativeClientInitialMetadata::GetElems(); 414 std::vector<grpc_linked_mdelem> storage(elems.size()); 415 for (size_t i = 0; i < elems.size(); i++) { 416 GPR_ASSERT(GRPC_LOG_IF_ERROR( 417 "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); 418 } 419 420 gpr_event* bm_done = new gpr_event; 421 gpr_event_init(bm_done); 422 423 std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { 424 if (!state.KeepRunning()) { 425 gpr_event_set(bm_done, (void*)1); 426 return; 427 } 428 grpc_slice_buffer send_buffer; 429 grpc_slice_buffer_init(&send_buffer); 430 grpc_slice_buffer_add(&send_buffer, grpc_slice_ref(send_slice)); 431 send_stream.Init(&send_buffer, 0); 432 grpc_slice_buffer_destroy(&send_buffer); 433 // force outgoing window to be yuge 434 s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); 435 f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); 436 reset_op(); 437 op.on_complete = c.get(); 438 op.send_message = true; 439 op.payload->send_message.send_message.reset(send_stream.get()); 440 s->Op(&op); 441 }); 442 443 reset_op(); 444 op.send_initial_metadata = true; 445 op.payload->send_initial_metadata.send_initial_metadata = &b; 446 op.on_complete = c.get(); 447 s->Op(&op); 448 449 f.FlushExecCtx(); 450 gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME)); 451 done_events.emplace_back(bm_done); 452 453 reset_op(); 454 op.cancel_stream = true; 455 op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; 456 s->Op(&op); 457 s->DestroyThen(MakeOnceClosure([](grpc_error* error) {})); 458 f.FlushExecCtx(); 459 s.reset(); 460 track_counters.Finish(state); 461 grpc_metadata_batch_destroy(&b); 462 grpc_slice_unref(send_slice); 463 } 464 BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024); 465 466 #define SLICE_FROM_BUFFER(s) grpc_slice_from_static_buffer(s, sizeof(s) - 1) 467 468 static grpc_slice CreateIncomingDataSlice(size_t length, size_t frame_size) { 469 std::queue<char> unframed; 470 471 unframed.push(static_cast<uint8_t>(0)); 472 unframed.push(static_cast<uint8_t>(length >> 24)); 473 unframed.push(static_cast<uint8_t>(length >> 16)); 474 unframed.push(static_cast<uint8_t>(length >> 8)); 475 unframed.push(static_cast<uint8_t>(length)); 476 for (size_t i = 0; i < length; i++) { 477 unframed.push('a'); 478 } 479 480 std::vector<char> framed; 481 while (unframed.size() > frame_size) { 482 // frame size 483 framed.push_back(static_cast<uint8_t>(frame_size >> 16)); 484 framed.push_back(static_cast<uint8_t>(frame_size >> 8)); 485 framed.push_back(static_cast<uint8_t>(frame_size)); 486 // data frame 487 framed.push_back(0); 488 // no flags 489 framed.push_back(0); 490 // stream id 491 framed.push_back(0); 492 framed.push_back(0); 493 framed.push_back(0); 494 framed.push_back(1); 495 // frame data 496 for (size_t i = 0; i < frame_size; i++) { 497 framed.push_back(unframed.front()); 498 unframed.pop(); 499 } 500 } 501 502 // frame size 503 framed.push_back(static_cast<uint8_t>(unframed.size() >> 16)); 504 framed.push_back(static_cast<uint8_t>(unframed.size() >> 8)); 505 framed.push_back(static_cast<uint8_t>(unframed.size())); 506 // data frame 507 framed.push_back(0); 508 // no flags 509 framed.push_back(0); 510 // stream id 511 framed.push_back(0); 512 framed.push_back(0); 513 framed.push_back(0); 514 framed.push_back(1); 515 while (!unframed.empty()) { 516 framed.push_back(unframed.front()); 517 unframed.pop(); 518 } 519 520 return grpc_slice_from_copied_buffer(framed.data(), framed.size()); 521 } 522 523 static void BM_TransportStreamRecv(benchmark::State& state) { 524 TrackCounters track_counters; 525 grpc_core::ExecCtx exec_ctx; 526 Fixture f(grpc::ChannelArguments(), true); 527 Stream s(&f); 528 s.Init(state); 529 grpc_transport_stream_op_batch_payload op_payload; 530 memset(&op_payload, 0, sizeof(op_payload)); 531 grpc_transport_stream_op_batch op; 532 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_stream; 533 grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384); 534 535 auto reset_op = [&]() { 536 memset(&op, 0, sizeof(op)); 537 op.payload = &op_payload; 538 }; 539 540 grpc_metadata_batch b; 541 grpc_metadata_batch_init(&b); 542 grpc_metadata_batch b_recv; 543 grpc_metadata_batch_init(&b_recv); 544 b.deadline = GRPC_MILLIS_INF_FUTURE; 545 std::vector<grpc_mdelem> elems = 546 RepresentativeClientInitialMetadata::GetElems(); 547 std::vector<grpc_linked_mdelem> storage(elems.size()); 548 for (size_t i = 0; i < elems.size(); i++) { 549 GPR_ASSERT(GRPC_LOG_IF_ERROR( 550 "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i]))); 551 } 552 553 std::unique_ptr<Closure> do_nothing = MakeClosure([](grpc_error* error) {}); 554 555 uint32_t received; 556 557 std::unique_ptr<Closure> drain_start; 558 std::unique_ptr<Closure> drain; 559 std::unique_ptr<Closure> drain_continue; 560 grpc_slice recv_slice; 561 562 std::unique_ptr<Closure> c = MakeClosure([&](grpc_error* error) { 563 if (!state.KeepRunning()) return; 564 // force outgoing window to be yuge 565 s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); 566 f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); 567 received = 0; 568 reset_op(); 569 op.on_complete = do_nothing.get(); 570 op.recv_message = true; 571 op.payload->recv_message.recv_message = &recv_stream; 572 op.payload->recv_message.recv_message_ready = drain_start.get(); 573 s.Op(&op); 574 f.PushInput(grpc_slice_ref(incoming_data)); 575 }); 576 577 drain_start = MakeClosure([&](grpc_error* error) { 578 if (recv_stream == nullptr) { 579 GPR_ASSERT(!state.KeepRunning()); 580 return; 581 } 582 GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); 583 }); 584 585 drain = MakeClosure([&](grpc_error* error) { 586 do { 587 if (received == recv_stream->length()) { 588 recv_stream.reset(); 589 GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE); 590 return; 591 } 592 } while (recv_stream->Next(recv_stream->length() - received, 593 drain_continue.get()) && 594 GRPC_ERROR_NONE == recv_stream->Pull(&recv_slice) && 595 (received += GRPC_SLICE_LENGTH(recv_slice), 596 grpc_slice_unref_internal(recv_slice), true)); 597 }); 598 599 drain_continue = MakeClosure([&](grpc_error* error) { 600 recv_stream->Pull(&recv_slice); 601 received += GRPC_SLICE_LENGTH(recv_slice); 602 grpc_slice_unref_internal(recv_slice); 603 GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); 604 }); 605 606 reset_op(); 607 op.send_initial_metadata = true; 608 op.payload->send_initial_metadata.send_initial_metadata = &b; 609 op.recv_initial_metadata = true; 610 op.payload->recv_initial_metadata.recv_initial_metadata = &b_recv; 611 op.payload->recv_initial_metadata.recv_initial_metadata_ready = 612 do_nothing.get(); 613 op.on_complete = c.get(); 614 s.Op(&op); 615 f.PushInput(SLICE_FROM_BUFFER( 616 "\x00\x00\x00\x04\x00\x00\x00\x00\x00" 617 // Generated using: 618 // tools/codegen/core/gen_header_frame.py < 619 // test/cpp/microbenchmarks/representative_server_initial_metadata.headers 620 "\x00\x00X\x01\x04\x00\x00\x00\x01" 621 "\x10\x07:status\x03" 622 "200" 623 "\x10\x0c" 624 "content-type\x10" 625 "application/grpc" 626 "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip")); 627 628 f.FlushExecCtx(); 629 reset_op(); 630 op.cancel_stream = true; 631 op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; 632 s.Op(&op); 633 s.DestroyThen(MakeOnceClosure([](grpc_error* error) {})); 634 f.FlushExecCtx(); 635 track_counters.Finish(state); 636 grpc_metadata_batch_destroy(&b); 637 grpc_metadata_batch_destroy(&b_recv); 638 grpc_slice_unref(incoming_data); 639 } 640 BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024); 641 642 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace, 643 // and others do not. This allows us to support both modes. 644 namespace benchmark { 645 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); } 646 } // namespace benchmark 647 648 int main(int argc, char** argv) { 649 ::benchmark::Initialize(&argc, argv); 650 ::grpc::testing::InitTest(&argc, &argv, false); 651 benchmark::RunTheBenchmarksNamespaced(); 652 return 0; 653 } 654