1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 /* Benchmark gRPC end2end in various configurations */ 20 21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H 22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H 23 24 #include <benchmark/benchmark.h> 25 #include <sstream> 26 #include "src/core/lib/profiling/timers.h" 27 #include "src/proto/grpc/testing/echo.grpc.pb.h" 28 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h" 29 #include "test/cpp/microbenchmarks/fullstack_fixtures.h" 30 31 namespace grpc { 32 namespace testing { 33 34 /******************************************************************************* 35 * BENCHMARKING KERNELS 36 */ 37 38 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } 39 40 template <class Fixture> 41 static void BM_PumpStreamClientToServer(benchmark::State& state) { 42 EchoTestService::AsyncService service; 43 std::unique_ptr<Fixture> fixture(new Fixture(&service)); 44 { 45 EchoRequest send_request; 46 EchoRequest recv_request; 47 if (state.range(0) > 0) { 48 send_request.set_message(std::string(state.range(0), 'a')); 49 } 50 Status recv_status; 51 ServerContext svr_ctx; 52 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); 53 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), 54 fixture->cq(), tag(0)); 55 std::unique_ptr<EchoTestService::Stub> stub( 56 EchoTestService::NewStub(fixture->channel())); 57 ClientContext cli_ctx; 58 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); 59 int need_tags = (1 << 0) | (1 << 1); 60 void* t; 61 bool ok; 62 while (need_tags) { 63 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 64 GPR_ASSERT(ok); 65 int i = static_cast<int>((intptr_t)t); 66 GPR_ASSERT(need_tags & (1 << i)); 67 need_tags &= ~(1 << i); 68 } 69 response_rw.Read(&recv_request, tag(0)); 70 while (state.KeepRunning()) { 71 GPR_TIMER_SCOPE("BenchmarkCycle", 0); 72 request_rw->Write(send_request, tag(1)); 73 while (true) { 74 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 75 if (t == tag(0)) { 76 response_rw.Read(&recv_request, tag(0)); 77 } else if (t == tag(1)) { 78 break; 79 } else { 80 GPR_ASSERT(false); 81 } 82 } 83 } 84 request_rw->WritesDone(tag(1)); 85 need_tags = (1 << 0) | (1 << 1); 86 while (need_tags) { 87 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 88 int i = static_cast<int>((intptr_t)t); 89 GPR_ASSERT(need_tags & (1 << i)); 90 need_tags &= ~(1 << i); 91 } 92 response_rw.Finish(Status::OK, tag(0)); 93 Status final_status; 94 request_rw->Finish(&final_status, tag(1)); 95 need_tags = (1 << 0) | (1 << 1); 96 while (need_tags) { 97 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 98 int i = static_cast<int>((intptr_t)t); 99 GPR_ASSERT(need_tags & (1 << i)); 100 need_tags &= ~(1 << i); 101 } 102 GPR_ASSERT(final_status.ok()); 103 } 104 fixture->Finish(state); 105 fixture.reset(); 106 state.SetBytesProcessed(state.range(0) * state.iterations()); 107 } 108 109 template <class Fixture> 110 static void BM_PumpStreamServerToClient(benchmark::State& state) { 111 EchoTestService::AsyncService service; 112 std::unique_ptr<Fixture> fixture(new Fixture(&service)); 113 { 114 EchoResponse send_response; 115 EchoResponse recv_response; 116 if (state.range(0) > 0) { 117 send_response.set_message(std::string(state.range(0), 'a')); 118 } 119 Status recv_status; 120 ServerContext svr_ctx; 121 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx); 122 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(), 123 fixture->cq(), tag(0)); 124 std::unique_ptr<EchoTestService::Stub> stub( 125 EchoTestService::NewStub(fixture->channel())); 126 ClientContext cli_ctx; 127 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); 128 int need_tags = (1 << 0) | (1 << 1); 129 void* t; 130 bool ok; 131 while (need_tags) { 132 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 133 GPR_ASSERT(ok); 134 int i = static_cast<int>((intptr_t)t); 135 GPR_ASSERT(need_tags & (1 << i)); 136 need_tags &= ~(1 << i); 137 } 138 request_rw->Read(&recv_response, tag(0)); 139 while (state.KeepRunning()) { 140 GPR_TIMER_SCOPE("BenchmarkCycle", 0); 141 response_rw.Write(send_response, tag(1)); 142 while (true) { 143 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 144 if (t == tag(0)) { 145 request_rw->Read(&recv_response, tag(0)); 146 } else if (t == tag(1)) { 147 break; 148 } else { 149 GPR_ASSERT(false); 150 } 151 } 152 } 153 response_rw.Finish(Status::OK, tag(1)); 154 need_tags = (1 << 0) | (1 << 1); 155 while (need_tags) { 156 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 157 int i = static_cast<int>((intptr_t)t); 158 GPR_ASSERT(need_tags & (1 << i)); 159 need_tags &= ~(1 << i); 160 } 161 } 162 fixture->Finish(state); 163 fixture.reset(); 164 state.SetBytesProcessed(state.range(0) * state.iterations()); 165 } 166 } // namespace testing 167 } // namespace grpc 168 169 #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 170