Home | History | Annotate | Download | only in performance
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/log.h>
     20 #include <grpcpp/channel.h>
     21 #include <grpcpp/create_channel.h>
     22 #include <grpcpp/impl/grpc_library.h>
     23 #include <grpcpp/security/credentials.h>
     24 #include <grpcpp/security/server_credentials.h>
     25 #include <grpcpp/server.h>
     26 #include <grpcpp/server_builder.h>
     27 #include <gtest/gtest.h>
     28 
     29 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
     30 #include "src/core/lib/channel/channel_args.h"
     31 #include "src/core/lib/iomgr/endpoint.h"
     32 #include "src/core/lib/iomgr/endpoint_pair.h"
     33 #include "src/core/lib/iomgr/exec_ctx.h"
     34 #include "src/core/lib/iomgr/tcp_posix.h"
     35 #include "src/core/lib/surface/channel.h"
     36 #include "src/core/lib/surface/completion_queue.h"
     37 #include "src/core/lib/surface/server.h"
     38 #include "test/core/util/passthru_endpoint.h"
     39 #include "test/core/util/port.h"
     40 
     41 #include "src/cpp/client/create_channel_internal.h"
     42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
     43 #include "test/core/util/test_config.h"
     44 
     45 namespace grpc {
     46 namespace testing {
     47 
     48 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
     49 
     50 static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
     51   b->SetMaxReceiveMessageSize(INT_MAX);
     52   b->SetMaxSendMessageSize(INT_MAX);
     53 }
     54 
     55 static void ApplyCommonChannelArguments(ChannelArguments* c) {
     56   c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
     57   c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
     58 }
     59 
     60 static class InitializeStuff {
     61  public:
     62   InitializeStuff() {
     63     init_lib_.init();
     64     rq_ = grpc_resource_quota_create("bm");
     65   }
     66 
     67   ~InitializeStuff() { init_lib_.shutdown(); }
     68 
     69   grpc_resource_quota* rq() { return rq_; }
     70 
     71  private:
     72   internal::GrpcLibrary init_lib_;
     73   grpc_resource_quota* rq_;
     74 } initialize_stuff;
     75 
     76 class EndpointPairFixture {
     77  public:
     78   EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) {
     79     ServerBuilder b;
     80     cq_ = b.AddCompletionQueue(true);
     81     b.RegisterService(service);
     82     ApplyCommonServerBuilderConfig(&b);
     83     server_ = b.BuildAndStart();
     84 
     85     grpc_core::ExecCtx exec_ctx;
     86 
     87     /* add server endpoint to server_ */
     88     {
     89       const grpc_channel_args* server_args =
     90           grpc_server_get_channel_args(server_->c_server());
     91       grpc_transport* transport = grpc_create_chttp2_transport(
     92           server_args, endpoints.server, false /* is_client */);
     93 
     94       grpc_pollset** pollsets;
     95       size_t num_pollsets = 0;
     96       grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets);
     97 
     98       for (size_t i = 0; i < num_pollsets; i++) {
     99         grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]);
    100       }
    101 
    102       grpc_server_setup_transport(server_->c_server(), transport, nullptr,
    103                                   server_args);
    104       grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
    105     }
    106 
    107     /* create channel */
    108     {
    109       ChannelArguments args;
    110       args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
    111       ApplyCommonChannelArguments(&args);
    112 
    113       grpc_channel_args c_args = args.c_channel_args();
    114       grpc_transport* transport =
    115           grpc_create_chttp2_transport(&c_args, endpoints.client, true);
    116       GPR_ASSERT(transport);
    117       grpc_channel* channel = grpc_channel_create(
    118           "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
    119       grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
    120 
    121       channel_ = CreateChannelInternal("", channel);
    122     }
    123   }
    124 
    125   virtual ~EndpointPairFixture() {
    126     server_->Shutdown();
    127     cq_->Shutdown();
    128     void* tag;
    129     bool ok;
    130     while (cq_->Next(&tag, &ok)) {
    131     }
    132   }
    133 
    134   ServerCompletionQueue* cq() { return cq_.get(); }
    135   std::shared_ptr<Channel> channel() { return channel_; }
    136 
    137  private:
    138   std::unique_ptr<Server> server_;
    139   std::unique_ptr<ServerCompletionQueue> cq_;
    140   std::shared_ptr<Channel> channel_;
    141 };
    142 
    143 class InProcessCHTTP2 : public EndpointPairFixture {
    144  public:
    145   InProcessCHTTP2(Service* service, grpc_passthru_endpoint_stats* stats)
    146       : EndpointPairFixture(service, MakeEndpoints(stats)), stats_(stats) {}
    147 
    148   virtual ~InProcessCHTTP2() {
    149     if (stats_ != nullptr) {
    150       grpc_passthru_endpoint_stats_destroy(stats_);
    151     }
    152   }
    153 
    154   int writes_performed() const { return stats_->num_writes; }
    155 
    156  private:
    157   grpc_passthru_endpoint_stats* stats_;
    158 
    159   static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) {
    160     grpc_endpoint_pair p;
    161     grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
    162                                   stats);
    163     return p;
    164   }
    165 };
    166 
    167 static double UnaryPingPong(int request_size, int response_size) {
    168   const int kIterations = 10000;
    169 
    170   EchoTestService::AsyncService service;
    171   std::unique_ptr<InProcessCHTTP2> fixture(
    172       new InProcessCHTTP2(&service, grpc_passthru_endpoint_stats_create()));
    173   EchoRequest send_request;
    174   EchoResponse send_response;
    175   EchoResponse recv_response;
    176   if (request_size > 0) {
    177     send_request.set_message(std::string(request_size, 'a'));
    178   }
    179   if (response_size > 0) {
    180     send_response.set_message(std::string(response_size, 'a'));
    181   }
    182   Status recv_status;
    183   struct ServerEnv {
    184     ServerContext ctx;
    185     EchoRequest recv_request;
    186     grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
    187     ServerEnv() : response_writer(&ctx) {}
    188   };
    189   uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
    190   ServerEnv* server_env[2] = {
    191       reinterpret_cast<ServerEnv*>(server_env_buffer),
    192       reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
    193   new (server_env[0]) ServerEnv;
    194   new (server_env[1]) ServerEnv;
    195   service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
    196                       &server_env[0]->response_writer, fixture->cq(),
    197                       fixture->cq(), tag(0));
    198   service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
    199                       &server_env[1]->response_writer, fixture->cq(),
    200                       fixture->cq(), tag(1));
    201   std::unique_ptr<EchoTestService::Stub> stub(
    202       EchoTestService::NewStub(fixture->channel()));
    203   for (int iteration = 0; iteration < kIterations; iteration++) {
    204     recv_response.Clear();
    205     ClientContext cli_ctx;
    206     std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
    207         stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
    208     void* t;
    209     bool ok;
    210     response_reader->Finish(&recv_response, &recv_status, tag(4));
    211     GPR_ASSERT(fixture->cq()->Next(&t, &ok));
    212     GPR_ASSERT(ok);
    213     GPR_ASSERT(t == tag(0) || t == tag(1));
    214     intptr_t slot = reinterpret_cast<intptr_t>(t);
    215     ServerEnv* senv = server_env[slot];
    216     senv->response_writer.Finish(send_response, Status::OK, tag(3));
    217     for (int i = (1 << 3) | (1 << 4); i != 0;) {
    218       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
    219       GPR_ASSERT(ok);
    220       int tagnum = static_cast<int>(reinterpret_cast<intptr_t>(t));
    221       GPR_ASSERT(i & (1 << tagnum));
    222       i -= 1 << tagnum;
    223     }
    224     GPR_ASSERT(recv_status.ok());
    225 
    226     senv->~ServerEnv();
    227     senv = new (senv) ServerEnv();
    228     service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
    229                         fixture->cq(), fixture->cq(), tag(slot));
    230   }
    231 
    232   double writes_per_iteration =
    233       static_cast<double>(fixture->writes_performed()) /
    234       static_cast<double>(kIterations);
    235 
    236   fixture.reset();
    237   server_env[0]->~ServerEnv();
    238   server_env[1]->~ServerEnv();
    239 
    240   return writes_per_iteration;
    241 }
    242 
    243 TEST(WritesPerRpcTest, UnaryPingPong) {
    244   EXPECT_LT(UnaryPingPong(0, 0), 2.05);
    245   EXPECT_LT(UnaryPingPong(1, 0), 2.05);
    246   EXPECT_LT(UnaryPingPong(0, 1), 2.05);
    247   EXPECT_LT(UnaryPingPong(4096, 0), 2.5);
    248   EXPECT_LT(UnaryPingPong(0, 4096), 2.5);
    249 }
    250 
    251 }  // namespace testing
    252 }  // namespace grpc
    253 
    254 int main(int argc, char** argv) {
    255   grpc_test_init(argc, argv);
    256   ::testing::InitGoogleTest(&argc, argv);
    257   return RUN_ALL_TESTS();
    258 }
    259