Home | History | Annotate | Download | only in end2end
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "test/cpp/end2end/test_service_impl.h"
     20 
     21 #include <string>
     22 #include <thread>
     23 
     24 #include <grpc/support/log.h>
     25 #include <grpcpp/security/credentials.h>
     26 #include <grpcpp/server_context.h>
     27 
     28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
     29 #include "test/cpp/util/string_ref_helper.h"
     30 
     31 #include <gtest/gtest.h>
     32 
     33 using std::chrono::system_clock;
     34 
     35 namespace grpc {
     36 namespace testing {
     37 namespace {
     38 
     39 // When echo_deadline is requested, deadline seen in the ServerContext is set in
     40 // the response in seconds.
     41 void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
     42                        EchoResponse* response) {
     43   if (request->has_param() && request->param().echo_deadline()) {
     44     gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
     45     if (context->deadline() != system_clock::time_point::max()) {
     46       Timepoint2Timespec(context->deadline(), &deadline);
     47     }
     48     response->mutable_param()->set_request_deadline(deadline.tv_sec);
     49   }
     50 }
     51 
     52 void CheckServerAuthContext(
     53     const ServerContext* context,
     54     const grpc::string& expected_transport_security_type,
     55     const grpc::string& expected_client_identity) {
     56   std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
     57   std::vector<grpc::string_ref> tst =
     58       auth_ctx->FindPropertyValues("transport_security_type");
     59   EXPECT_EQ(1u, tst.size());
     60   EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
     61   if (expected_client_identity.empty()) {
     62     EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
     63     EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
     64     EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
     65   } else {
     66     auto identity = auth_ctx->GetPeerIdentity();
     67     EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
     68     EXPECT_EQ(1u, identity.size());
     69     EXPECT_EQ(expected_client_identity, identity[0]);
     70   }
     71 }
     72 }  // namespace
     73 
     74 Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
     75                              EchoResponse* response) {
     76   // A bit of sleep to make sure that short deadline tests fail
     77   if (request->has_param() && request->param().server_sleep_us() > 0) {
     78     gpr_sleep_until(
     79         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
     80                      gpr_time_from_micros(request->param().server_sleep_us(),
     81                                           GPR_TIMESPAN)));
     82   }
     83 
     84   if (request->has_param() && request->param().server_die()) {
     85     gpr_log(GPR_ERROR, "The request should not reach application handler.");
     86     GPR_ASSERT(0);
     87   }
     88   if (request->has_param() && request->param().has_expected_error()) {
     89     const auto& error = request->param().expected_error();
     90     return Status(static_cast<StatusCode>(error.code()), error.error_message(),
     91                   error.binary_error_details());
     92   }
     93   int server_try_cancel = GetIntValueFromMetadata(
     94       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
     95   if (server_try_cancel > DO_NOT_CANCEL) {
     96     // Since this is a unary RPC, by the time this server handler is called,
     97     // the 'request' message is already read from the client. So the scenarios
     98     // in server_try_cancel don't make much sense. Just cancel the RPC as long
     99     // as server_try_cancel is not DO_NOT_CANCEL
    100     ServerTryCancel(context);
    101     return Status::CANCELLED;
    102   }
    103 
    104   response->set_message(request->message());
    105   MaybeEchoDeadline(context, request, response);
    106   if (host_) {
    107     response->mutable_param()->set_host(*host_);
    108   }
    109   if (request->has_param() && request->param().client_cancel_after_us()) {
    110     {
    111       std::unique_lock<std::mutex> lock(mu_);
    112       signal_client_ = true;
    113     }
    114     while (!context->IsCancelled()) {
    115       gpr_sleep_until(gpr_time_add(
    116           gpr_now(GPR_CLOCK_REALTIME),
    117           gpr_time_from_micros(request->param().client_cancel_after_us(),
    118                                GPR_TIMESPAN)));
    119     }
    120     return Status::CANCELLED;
    121   } else if (request->has_param() &&
    122              request->param().server_cancel_after_us()) {
    123     gpr_sleep_until(gpr_time_add(
    124         gpr_now(GPR_CLOCK_REALTIME),
    125         gpr_time_from_micros(request->param().server_cancel_after_us(),
    126                              GPR_TIMESPAN)));
    127     return Status::CANCELLED;
    128   } else if (!request->has_param() ||
    129              !request->param().skip_cancelled_check()) {
    130     EXPECT_FALSE(context->IsCancelled());
    131   }
    132 
    133   if (request->has_param() && request->param().echo_metadata()) {
    134     const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
    135         context->client_metadata();
    136     for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
    137              iter = client_metadata.begin();
    138          iter != client_metadata.end(); ++iter) {
    139       context->AddTrailingMetadata(ToString(iter->first),
    140                                    ToString(iter->second));
    141     }
    142     // Terminate rpc with error and debug info in trailer.
    143     if (request->param().debug_info().stack_entries_size() ||
    144         !request->param().debug_info().detail().empty()) {
    145       grpc::string serialized_debug_info =
    146           request->param().debug_info().SerializeAsString();
    147       context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
    148       return Status::CANCELLED;
    149     }
    150   }
    151   if (request->has_param() &&
    152       (request->param().expected_client_identity().length() > 0 ||
    153        request->param().check_auth_context())) {
    154     CheckServerAuthContext(context,
    155                            request->param().expected_transport_security_type(),
    156                            request->param().expected_client_identity());
    157   }
    158   if (request->has_param() && request->param().response_message_length() > 0) {
    159     response->set_message(
    160         grpc::string(request->param().response_message_length(), '\0'));
    161   }
    162   if (request->has_param() && request->param().echo_peer()) {
    163     response->mutable_param()->set_peer(context->peer());
    164   }
    165   return Status::OK;
    166 }
    167 
    168 // Unimplemented is left unimplemented to test the returned error.
    169 
    170 Status TestServiceImpl::RequestStream(ServerContext* context,
    171                                       ServerReader<EchoRequest>* reader,
    172                                       EchoResponse* response) {
    173   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
    174   // the server by calling ServerContext::TryCancel() depending on the value:
    175   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
    176   //   any message from the client
    177   //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
    178   //   reading messages from the client
    179   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
    180   //   all the messages from the client
    181   int server_try_cancel = GetIntValueFromMetadata(
    182       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
    183 
    184   EchoRequest request;
    185   response->set_message("");
    186 
    187   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
    188     ServerTryCancel(context);
    189     return Status::CANCELLED;
    190   }
    191 
    192   std::thread* server_try_cancel_thd = nullptr;
    193   if (server_try_cancel == CANCEL_DURING_PROCESSING) {
    194     server_try_cancel_thd =
    195         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
    196   }
    197 
    198   int num_msgs_read = 0;
    199   while (reader->Read(&request)) {
    200     response->mutable_message()->append(request.message());
    201   }
    202   gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
    203 
    204   if (server_try_cancel_thd != nullptr) {
    205     server_try_cancel_thd->join();
    206     delete server_try_cancel_thd;
    207     return Status::CANCELLED;
    208   }
    209 
    210   if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
    211     ServerTryCancel(context);
    212     return Status::CANCELLED;
    213   }
    214 
    215   return Status::OK;
    216 }
    217 
    218 // Return 'kNumResponseStreamMsgs' messages.
    219 // TODO(yangg) make it generic by adding a parameter into EchoRequest
    220 Status TestServiceImpl::ResponseStream(ServerContext* context,
    221                                        const EchoRequest* request,
    222                                        ServerWriter<EchoResponse>* writer) {
    223   // If server_try_cancel is set in the metadata, the RPC is cancelled by the
    224   // server by calling ServerContext::TryCancel() depending on the value:
    225   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
    226   //   any messages to the client
    227   //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
    228   //   writing messages to the client
    229   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
    230   //   all the messages to the client
    231   int server_try_cancel = GetIntValueFromMetadata(
    232       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
    233 
    234   int server_coalescing_api = GetIntValueFromMetadata(
    235       kServerUseCoalescingApi, context->client_metadata(), 0);
    236 
    237   int server_responses_to_send = GetIntValueFromMetadata(
    238       kServerResponseStreamsToSend, context->client_metadata(),
    239       kServerDefaultResponseStreamsToSend);
    240 
    241   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
    242     ServerTryCancel(context);
    243     return Status::CANCELLED;
    244   }
    245 
    246   EchoResponse response;
    247   std::thread* server_try_cancel_thd = nullptr;
    248   if (server_try_cancel == CANCEL_DURING_PROCESSING) {
    249     server_try_cancel_thd =
    250         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
    251   }
    252 
    253   for (int i = 0; i < server_responses_to_send; i++) {
    254     response.set_message(request->message() + grpc::to_string(i));
    255     if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
    256       writer->WriteLast(response, WriteOptions());
    257     } else {
    258       writer->Write(response);
    259     }
    260   }
    261 
    262   if (server_try_cancel_thd != nullptr) {
    263     server_try_cancel_thd->join();
    264     delete server_try_cancel_thd;
    265     return Status::CANCELLED;
    266   }
    267 
    268   if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
    269     ServerTryCancel(context);
    270     return Status::CANCELLED;
    271   }
    272 
    273   return Status::OK;
    274 }
    275 
    276 Status TestServiceImpl::BidiStream(
    277     ServerContext* context,
    278     ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
    279   // If server_try_cancel is set in the metadata, the RPC is cancelled by the
    280   // server by calling ServerContext::TryCancel() depending on the value:
    281   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
    282   //   writes any messages from/to the client
    283   //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
    284   //   reading/writing messages from/to the client
    285   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
    286   //   reads/writes all messages from/to the client
    287   int server_try_cancel = GetIntValueFromMetadata(
    288       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
    289 
    290   EchoRequest request;
    291   EchoResponse response;
    292 
    293   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
    294     ServerTryCancel(context);
    295     return Status::CANCELLED;
    296   }
    297 
    298   std::thread* server_try_cancel_thd = nullptr;
    299   if (server_try_cancel == CANCEL_DURING_PROCESSING) {
    300     server_try_cancel_thd =
    301         new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
    302   }
    303 
    304   // kServerFinishAfterNReads suggests after how many reads, the server should
    305   // write the last message and send status (coalesced using WriteLast)
    306   int server_write_last = GetIntValueFromMetadata(
    307       kServerFinishAfterNReads, context->client_metadata(), 0);
    308 
    309   int read_counts = 0;
    310   while (stream->Read(&request)) {
    311     read_counts++;
    312     gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
    313     response.set_message(request.message());
    314     if (read_counts == server_write_last) {
    315       stream->WriteLast(response, WriteOptions());
    316     } else {
    317       stream->Write(response);
    318     }
    319   }
    320 
    321   if (server_try_cancel_thd != nullptr) {
    322     server_try_cancel_thd->join();
    323     delete server_try_cancel_thd;
    324     return Status::CANCELLED;
    325   }
    326 
    327   if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
    328     ServerTryCancel(context);
    329     return Status::CANCELLED;
    330   }
    331 
    332   return Status::OK;
    333 }
    334 
    335 int TestServiceImpl::GetIntValueFromMetadata(
    336     const char* key,
    337     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
    338     int default_value) {
    339   if (metadata.find(key) != metadata.end()) {
    340     std::istringstream iss(ToString(metadata.find(key)->second));
    341     iss >> default_value;
    342     gpr_log(GPR_INFO, "%s : %d", key, default_value);
    343   }
    344 
    345   return default_value;
    346 }
    347 
    348 void TestServiceImpl::ServerTryCancel(ServerContext* context) {
    349   EXPECT_FALSE(context->IsCancelled());
    350   context->TryCancel();
    351   gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
    352   // Now wait until it's really canceled
    353   while (!context->IsCancelled()) {
    354     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
    355                                  gpr_time_from_micros(1000, GPR_TIMESPAN)));
    356   }
    357 }
    358 
    359 }  // namespace testing
    360 }  // namespace grpc
    361