Home | History | Annotate | Download | only in interop
      1 /*
      2  *
      3  * Copyright 2015-2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <fstream>
     20 #include <memory>
     21 #include <sstream>
     22 #include <thread>
     23 
     24 #include <gflags/gflags.h>
     25 #include <grpc/grpc.h>
     26 #include <grpc/support/log.h>
     27 #include <grpc/support/time.h>
     28 #include <grpcpp/security/server_credentials.h>
     29 #include <grpcpp/server.h>
     30 #include <grpcpp/server_builder.h>
     31 #include <grpcpp/server_context.h>
     32 
     33 #include "src/core/lib/gpr/string.h"
     34 #include "src/core/lib/transport/byte_stream.h"
     35 #include "src/proto/grpc/testing/empty.pb.h"
     36 #include "src/proto/grpc/testing/messages.pb.h"
     37 #include "src/proto/grpc/testing/test.grpc.pb.h"
     38 #include "test/cpp/interop/server_helper.h"
     39 #include "test/cpp/util/test_config.h"
     40 
     41 DEFINE_bool(use_alts, false,
     42             "Whether to use alts. Enable alts will disable tls.");
     43 DEFINE_bool(use_tls, false, "Whether to use tls.");
     44 DEFINE_string(custom_credentials_type, "", "User provided credentials type.");
     45 DEFINE_int32(port, 0, "Server port.");
     46 DEFINE_int32(max_send_message_size, -1, "The maximum send message size.");
     47 
     48 using grpc::Server;
     49 using grpc::ServerBuilder;
     50 using grpc::ServerContext;
     51 using grpc::ServerCredentials;
     52 using grpc::ServerReader;
     53 using grpc::ServerReaderWriter;
     54 using grpc::ServerWriter;
     55 using grpc::SslServerCredentialsOptions;
     56 using grpc::Status;
     57 using grpc::WriteOptions;
     58 using grpc::testing::InteropServerContextInspector;
     59 using grpc::testing::Payload;
     60 using grpc::testing::SimpleRequest;
     61 using grpc::testing::SimpleResponse;
     62 using grpc::testing::StreamingInputCallRequest;
     63 using grpc::testing::StreamingInputCallResponse;
     64 using grpc::testing::StreamingOutputCallRequest;
     65 using grpc::testing::StreamingOutputCallResponse;
     66 using grpc::testing::TestService;
     67 
     68 const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
     69 const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
     70 const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
     71 
     72 void MaybeEchoMetadata(ServerContext* context) {
     73   const auto& client_metadata = context->client_metadata();
     74   GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
     75   GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
     76 
     77   auto iter = client_metadata.find(kEchoInitialMetadataKey);
     78   if (iter != client_metadata.end()) {
     79     context->AddInitialMetadata(
     80         kEchoInitialMetadataKey,
     81         grpc::string(iter->second.begin(), iter->second.end()));
     82   }
     83   iter = client_metadata.find(kEchoTrailingBinMetadataKey);
     84   if (iter != client_metadata.end()) {
     85     context->AddTrailingMetadata(
     86         kEchoTrailingBinMetadataKey,
     87         grpc::string(iter->second.begin(), iter->second.end()));
     88   }
     89   // Check if client sent a magic key in the header that makes us echo
     90   // back the user-agent (for testing purpose)
     91   iter = client_metadata.find(kEchoUserAgentKey);
     92   if (iter != client_metadata.end()) {
     93     iter = client_metadata.find("user-agent");
     94     if (iter != client_metadata.end()) {
     95       context->AddInitialMetadata(
     96           kEchoUserAgentKey,
     97           grpc::string(iter->second.begin(), iter->second.end()));
     98     }
     99   }
    100 }
    101 
    102 bool SetPayload(int size, Payload* payload) {
    103   std::unique_ptr<char[]> body(new char[size]());
    104   payload->set_body(body.get(), size);
    105   return true;
    106 }
    107 
    108 bool CheckExpectedCompression(const ServerContext& context,
    109                               const bool compression_expected) {
    110   const InteropServerContextInspector inspector(context);
    111   const grpc_compression_algorithm received_compression =
    112       inspector.GetCallCompressionAlgorithm();
    113 
    114   if (compression_expected) {
    115     if (received_compression == GRPC_COMPRESS_NONE) {
    116       // Expected some compression, got NONE. This is an error.
    117       gpr_log(GPR_ERROR,
    118               "Expected compression but got uncompressed request from client.");
    119       return false;
    120     }
    121     if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) {
    122       gpr_log(GPR_ERROR,
    123               "Failure: Requested compression in a compressable request, but "
    124               "compression bit in message flags not set.");
    125       return false;
    126     }
    127   } else {
    128     // Didn't expect compression -> make sure the request is uncompressed
    129     if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) {
    130       gpr_log(GPR_ERROR,
    131               "Failure: Didn't requested compression, but compression bit in "
    132               "message flags set.");
    133       return false;
    134     }
    135   }
    136   return true;
    137 }
    138 
    139 class TestServiceImpl : public TestService::Service {
    140  public:
    141   Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request,
    142                    grpc::testing::Empty* response) {
    143     MaybeEchoMetadata(context);
    144     return Status::OK;
    145   }
    146 
    147   // Response contains current timestamp. We ignore everything in the request.
    148   Status CacheableUnaryCall(ServerContext* context,
    149                             const SimpleRequest* request,
    150                             SimpleResponse* response) {
    151     gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
    152     std::string timestamp = std::to_string((long long unsigned)ts.tv_nsec);
    153     response->mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
    154     context->AddInitialMetadata("cache-control", "max-age=60, public");
    155     return Status::OK;
    156   }
    157 
    158   Status UnaryCall(ServerContext* context, const SimpleRequest* request,
    159                    SimpleResponse* response) {
    160     MaybeEchoMetadata(context);
    161     if (request->has_response_compressed()) {
    162       const bool compression_requested = request->response_compressed().value();
    163       gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
    164               compression_requested ? "enabled" : "disabled", __func__);
    165       if (compression_requested) {
    166         // Any level would do, let's go for HIGH because we are overachievers.
    167         context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
    168       } else {
    169         context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE);
    170       }
    171     }
    172     if (!CheckExpectedCompression(*context,
    173                                   request->expect_compressed().value())) {
    174       return Status(grpc::StatusCode::INVALID_ARGUMENT,
    175                     "Compressed request expectation not met.");
    176     }
    177     if (request->response_size() > 0) {
    178       if (!SetPayload(request->response_size(), response->mutable_payload())) {
    179         return Status(grpc::StatusCode::INVALID_ARGUMENT,
    180                       "Error creating payload.");
    181       }
    182     }
    183 
    184     if (request->has_response_status()) {
    185       return Status(
    186           static_cast<grpc::StatusCode>(request->response_status().code()),
    187           request->response_status().message());
    188     }
    189 
    190     return Status::OK;
    191   }
    192 
    193   Status StreamingOutputCall(
    194       ServerContext* context, const StreamingOutputCallRequest* request,
    195       ServerWriter<StreamingOutputCallResponse>* writer) {
    196     StreamingOutputCallResponse response;
    197     bool write_success = true;
    198     for (int i = 0; write_success && i < request->response_parameters_size();
    199          i++) {
    200       if (!SetPayload(request->response_parameters(i).size(),
    201                       response.mutable_payload())) {
    202         return Status(grpc::StatusCode::INVALID_ARGUMENT,
    203                       "Error creating payload.");
    204       }
    205       WriteOptions wopts;
    206       if (request->response_parameters(i).has_compressed()) {
    207         // Compress by default. Disabled on a per-message basis.
    208         context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
    209         const bool compression_requested =
    210             request->response_parameters(i).compressed().value();
    211         gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s",
    212                 compression_requested ? "enabled" : "disabled", __func__);
    213         if (!compression_requested) {
    214           wopts.set_no_compression();
    215         }  // else, compression is already enabled via the context.
    216       }
    217       int time_us;
    218       if ((time_us = request->response_parameters(i).interval_us()) > 0) {
    219         // Sleep before response if needed
    220         gpr_timespec sleep_time =
    221             gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
    222                          gpr_time_from_micros(time_us, GPR_TIMESPAN));
    223         gpr_sleep_until(sleep_time);
    224       }
    225       write_success = writer->Write(response, wopts);
    226     }
    227     if (write_success) {
    228       return Status::OK;
    229     } else {
    230       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
    231     }
    232   }
    233 
    234   Status StreamingInputCall(ServerContext* context,
    235                             ServerReader<StreamingInputCallRequest>* reader,
    236                             StreamingInputCallResponse* response) {
    237     StreamingInputCallRequest request;
    238     int aggregated_payload_size = 0;
    239     while (reader->Read(&request)) {
    240       if (!CheckExpectedCompression(*context,
    241                                     request.expect_compressed().value())) {
    242         return Status(grpc::StatusCode::INVALID_ARGUMENT,
    243                       "Compressed request expectation not met.");
    244       }
    245       if (request.has_payload()) {
    246         aggregated_payload_size += request.payload().body().size();
    247       }
    248     }
    249     response->set_aggregated_payload_size(aggregated_payload_size);
    250     return Status::OK;
    251   }
    252 
    253   Status FullDuplexCall(
    254       ServerContext* context,
    255       ServerReaderWriter<StreamingOutputCallResponse,
    256                          StreamingOutputCallRequest>* stream) {
    257     MaybeEchoMetadata(context);
    258     StreamingOutputCallRequest request;
    259     StreamingOutputCallResponse response;
    260     bool write_success = true;
    261     while (write_success && stream->Read(&request)) {
    262       if (request.has_response_status()) {
    263         return Status(
    264             static_cast<grpc::StatusCode>(request.response_status().code()),
    265             request.response_status().message());
    266       }
    267       if (request.response_parameters_size() != 0) {
    268         response.mutable_payload()->set_type(request.payload().type());
    269         response.mutable_payload()->set_body(
    270             grpc::string(request.response_parameters(0).size(), '\0'));
    271         int time_us;
    272         if ((time_us = request.response_parameters(0).interval_us()) > 0) {
    273           // Sleep before response if needed
    274           gpr_timespec sleep_time =
    275               gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
    276                            gpr_time_from_micros(time_us, GPR_TIMESPAN));
    277           gpr_sleep_until(sleep_time);
    278         }
    279         write_success = stream->Write(response);
    280       }
    281     }
    282     if (write_success) {
    283       return Status::OK;
    284     } else {
    285       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
    286     }
    287   }
    288 
    289   Status HalfDuplexCall(
    290       ServerContext* context,
    291       ServerReaderWriter<StreamingOutputCallResponse,
    292                          StreamingOutputCallRequest>* stream) {
    293     std::vector<StreamingOutputCallRequest> requests;
    294     StreamingOutputCallRequest request;
    295     while (stream->Read(&request)) {
    296       requests.push_back(request);
    297     }
    298 
    299     StreamingOutputCallResponse response;
    300     bool write_success = true;
    301     for (unsigned int i = 0; write_success && i < requests.size(); i++) {
    302       response.mutable_payload()->set_type(requests[i].payload().type());
    303       if (requests[i].response_parameters_size() == 0) {
    304         return Status(grpc::StatusCode::INTERNAL,
    305                       "Request does not have response parameters.");
    306       }
    307       response.mutable_payload()->set_body(
    308           grpc::string(requests[i].response_parameters(0).size(), '\0'));
    309       write_success = stream->Write(response);
    310     }
    311     if (write_success) {
    312       return Status::OK;
    313     } else {
    314       return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
    315     }
    316   }
    317 };
    318 
    319 void grpc::testing::interop::RunServer(
    320     const std::shared_ptr<ServerCredentials>& creds) {
    321   RunServer(creds, FLAGS_port, nullptr, nullptr);
    322 }
    323 
    324 void grpc::testing::interop::RunServer(
    325     const std::shared_ptr<ServerCredentials>& creds,
    326     std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
    327         server_options) {
    328   RunServer(creds, FLAGS_port, nullptr, std::move(server_options));
    329 }
    330 
    331 void grpc::testing::interop::RunServer(
    332     const std::shared_ptr<ServerCredentials>& creds, const int port,
    333     ServerStartedCondition* server_started_condition) {
    334   RunServer(creds, port, server_started_condition, nullptr);
    335 }
    336 
    337 void grpc::testing::interop::RunServer(
    338     const std::shared_ptr<ServerCredentials>& creds, const int port,
    339     ServerStartedCondition* server_started_condition,
    340     std::unique_ptr<std::vector<std::unique_ptr<ServerBuilderOption>>>
    341         server_options) {
    342   GPR_ASSERT(port != 0);
    343   std::ostringstream server_address;
    344   server_address << "0.0.0.0:" << port;
    345   TestServiceImpl service;
    346 
    347   SimpleRequest request;
    348   SimpleResponse response;
    349 
    350   ServerBuilder builder;
    351   builder.RegisterService(&service);
    352   builder.AddListeningPort(server_address.str(), creds);
    353   if (server_options != nullptr) {
    354     for (size_t i = 0; i < server_options->size(); i++) {
    355       builder.SetOption(std::move((*server_options)[i]));
    356     }
    357   }
    358   if (FLAGS_max_send_message_size >= 0) {
    359     builder.SetMaxSendMessageSize(FLAGS_max_send_message_size);
    360   }
    361   std::unique_ptr<Server> server(builder.BuildAndStart());
    362   gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
    363 
    364   // Signal that the server has started.
    365   if (server_started_condition) {
    366     std::unique_lock<std::mutex> lock(server_started_condition->mutex);
    367     server_started_condition->server_started = true;
    368     server_started_condition->condition.notify_all();
    369   }
    370 
    371   while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
    372     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
    373                                  gpr_time_from_seconds(5, GPR_TIMESPAN)));
    374   }
    375 }
    376