1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/log.h> 20 #include <grpcpp/channel.h> 21 #include <grpcpp/create_channel.h> 22 #include <grpcpp/impl/grpc_library.h> 23 #include <grpcpp/security/credentials.h> 24 #include <grpcpp/security/server_credentials.h> 25 #include <grpcpp/server.h> 26 #include <grpcpp/server_builder.h> 27 #include <gtest/gtest.h> 28 29 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 30 #include "src/core/lib/channel/channel_args.h" 31 #include "src/core/lib/iomgr/endpoint.h" 32 #include "src/core/lib/iomgr/endpoint_pair.h" 33 #include "src/core/lib/iomgr/exec_ctx.h" 34 #include "src/core/lib/iomgr/tcp_posix.h" 35 #include "src/core/lib/surface/channel.h" 36 #include "src/core/lib/surface/completion_queue.h" 37 #include "src/core/lib/surface/server.h" 38 #include "test/core/util/passthru_endpoint.h" 39 #include "test/core/util/port.h" 40 41 #include "src/cpp/client/create_channel_internal.h" 42 #include "src/proto/grpc/testing/echo.grpc.pb.h" 43 #include "test/core/util/test_config.h" 44 45 namespace grpc { 46 namespace testing { 47 48 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); } 49 50 static void ApplyCommonServerBuilderConfig(ServerBuilder* b) { 51 b->SetMaxReceiveMessageSize(INT_MAX); 52 b->SetMaxSendMessageSize(INT_MAX); 53 } 54 55 static void ApplyCommonChannelArguments(ChannelArguments* c) { 56 c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); 57 c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); 58 } 59 60 static class InitializeStuff { 61 public: 62 InitializeStuff() { 63 init_lib_.init(); 64 rq_ = grpc_resource_quota_create("bm"); 65 } 66 67 ~InitializeStuff() { init_lib_.shutdown(); } 68 69 grpc_resource_quota* rq() { return rq_; } 70 71 private: 72 internal::GrpcLibrary init_lib_; 73 grpc_resource_quota* rq_; 74 } initialize_stuff; 75 76 class EndpointPairFixture { 77 public: 78 EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) { 79 ServerBuilder b; 80 cq_ = b.AddCompletionQueue(true); 81 b.RegisterService(service); 82 ApplyCommonServerBuilderConfig(&b); 83 server_ = b.BuildAndStart(); 84 85 grpc_core::ExecCtx exec_ctx; 86 87 /* add server endpoint to server_ */ 88 { 89 const grpc_channel_args* server_args = 90 grpc_server_get_channel_args(server_->c_server()); 91 grpc_transport* transport = grpc_create_chttp2_transport( 92 server_args, endpoints.server, false /* is_client */); 93 94 grpc_pollset** pollsets; 95 size_t num_pollsets = 0; 96 grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets); 97 98 for (size_t i = 0; i < num_pollsets; i++) { 99 grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]); 100 } 101 102 grpc_server_setup_transport(server_->c_server(), transport, nullptr, 103 server_args); 104 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); 105 } 106 107 /* create channel */ 108 { 109 ChannelArguments args; 110 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); 111 ApplyCommonChannelArguments(&args); 112 113 grpc_channel_args c_args = args.c_channel_args(); 114 grpc_transport* transport = 115 grpc_create_chttp2_transport(&c_args, endpoints.client, true); 116 GPR_ASSERT(transport); 117 grpc_channel* channel = grpc_channel_create( 118 "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); 119 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr); 120 121 channel_ = CreateChannelInternal("", channel); 122 } 123 } 124 125 virtual ~EndpointPairFixture() { 126 server_->Shutdown(); 127 cq_->Shutdown(); 128 void* tag; 129 bool ok; 130 while (cq_->Next(&tag, &ok)) { 131 } 132 } 133 134 ServerCompletionQueue* cq() { return cq_.get(); } 135 std::shared_ptr<Channel> channel() { return channel_; } 136 137 private: 138 std::unique_ptr<Server> server_; 139 std::unique_ptr<ServerCompletionQueue> cq_; 140 std::shared_ptr<Channel> channel_; 141 }; 142 143 class InProcessCHTTP2 : public EndpointPairFixture { 144 public: 145 InProcessCHTTP2(Service* service, grpc_passthru_endpoint_stats* stats) 146 : EndpointPairFixture(service, MakeEndpoints(stats)), stats_(stats) {} 147 148 virtual ~InProcessCHTTP2() { 149 if (stats_ != nullptr) { 150 grpc_passthru_endpoint_stats_destroy(stats_); 151 } 152 } 153 154 int writes_performed() const { return stats_->num_writes; } 155 156 private: 157 grpc_passthru_endpoint_stats* stats_; 158 159 static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) { 160 grpc_endpoint_pair p; 161 grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(), 162 stats); 163 return p; 164 } 165 }; 166 167 static double UnaryPingPong(int request_size, int response_size) { 168 const int kIterations = 10000; 169 170 EchoTestService::AsyncService service; 171 std::unique_ptr<InProcessCHTTP2> fixture( 172 new InProcessCHTTP2(&service, grpc_passthru_endpoint_stats_create())); 173 EchoRequest send_request; 174 EchoResponse send_response; 175 EchoResponse recv_response; 176 if (request_size > 0) { 177 send_request.set_message(std::string(request_size, 'a')); 178 } 179 if (response_size > 0) { 180 send_response.set_message(std::string(response_size, 'a')); 181 } 182 Status recv_status; 183 struct ServerEnv { 184 ServerContext ctx; 185 EchoRequest recv_request; 186 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer; 187 ServerEnv() : response_writer(&ctx) {} 188 }; 189 uint8_t server_env_buffer[2 * sizeof(ServerEnv)]; 190 ServerEnv* server_env[2] = { 191 reinterpret_cast<ServerEnv*>(server_env_buffer), 192 reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))}; 193 new (server_env[0]) ServerEnv; 194 new (server_env[1]) ServerEnv; 195 service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request, 196 &server_env[0]->response_writer, fixture->cq(), 197 fixture->cq(), tag(0)); 198 service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request, 199 &server_env[1]->response_writer, fixture->cq(), 200 fixture->cq(), tag(1)); 201 std::unique_ptr<EchoTestService::Stub> stub( 202 EchoTestService::NewStub(fixture->channel())); 203 for (int iteration = 0; iteration < kIterations; iteration++) { 204 recv_response.Clear(); 205 ClientContext cli_ctx; 206 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( 207 stub->AsyncEcho(&cli_ctx, send_request, fixture->cq())); 208 void* t; 209 bool ok; 210 response_reader->Finish(&recv_response, &recv_status, tag(4)); 211 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 212 GPR_ASSERT(ok); 213 GPR_ASSERT(t == tag(0) || t == tag(1)); 214 intptr_t slot = reinterpret_cast<intptr_t>(t); 215 ServerEnv* senv = server_env[slot]; 216 senv->response_writer.Finish(send_response, Status::OK, tag(3)); 217 for (int i = (1 << 3) | (1 << 4); i != 0;) { 218 GPR_ASSERT(fixture->cq()->Next(&t, &ok)); 219 GPR_ASSERT(ok); 220 int tagnum = static_cast<int>(reinterpret_cast<intptr_t>(t)); 221 GPR_ASSERT(i & (1 << tagnum)); 222 i -= 1 << tagnum; 223 } 224 GPR_ASSERT(recv_status.ok()); 225 226 senv->~ServerEnv(); 227 senv = new (senv) ServerEnv(); 228 service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer, 229 fixture->cq(), fixture->cq(), tag(slot)); 230 } 231 232 double writes_per_iteration = 233 static_cast<double>(fixture->writes_performed()) / 234 static_cast<double>(kIterations); 235 236 fixture.reset(); 237 server_env[0]->~ServerEnv(); 238 server_env[1]->~ServerEnv(); 239 240 return writes_per_iteration; 241 } 242 243 TEST(WritesPerRpcTest, UnaryPingPong) { 244 EXPECT_LT(UnaryPingPong(0, 0), 2.05); 245 EXPECT_LT(UnaryPingPong(1, 0), 2.05); 246 EXPECT_LT(UnaryPingPong(0, 1), 2.05); 247 EXPECT_LT(UnaryPingPong(4096, 0), 2.5); 248 EXPECT_LT(UnaryPingPong(0, 4096), 2.5); 249 } 250 251 } // namespace testing 252 } // namespace grpc 253 254 int main(int argc, char** argv) { 255 grpc_test_init(argc, argv); 256 ::testing::InitGoogleTest(&argc, argv); 257 return RUN_ALL_TESTS(); 258 } 259