Home | History | Annotate | Download | only in interop
      1 /*
      2  *
      3  * Copyright 2015-2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <cinttypes>
     20 #include <fstream>
     21 #include <memory>
     22 #include <utility>
     23 
     24 #include <grpc/grpc.h>
     25 #include <grpc/support/alloc.h>
     26 #include <grpc/support/log.h>
     27 #include <grpc/support/string_util.h>
     28 #include <grpc/support/time.h>
     29 #include <grpcpp/channel.h>
     30 #include <grpcpp/client_context.h>
     31 #include <grpcpp/security/credentials.h>
     32 
     33 #include "src/core/lib/transport/byte_stream.h"
     34 #include "src/proto/grpc/testing/empty.pb.h"
     35 #include "src/proto/grpc/testing/messages.pb.h"
     36 #include "src/proto/grpc/testing/test.grpc.pb.h"
     37 #include "test/cpp/interop/client_helper.h"
     38 #include "test/cpp/interop/interop_client.h"
     39 
     40 namespace grpc {
     41 namespace testing {
     42 
     43 namespace {
     44 // The same value is defined by the Java client.
     45 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
     46 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
     47 const int kNumResponseMessages = 2000;
     48 const int kResponseMessageSize = 1030;
     49 const int kReceiveDelayMilliSeconds = 20;
     50 const int kLargeRequestSize = 271828;
     51 const int kLargeResponseSize = 314159;
     52 
     53 void NoopChecks(const InteropClientContextInspector& inspector,
     54                 const SimpleRequest* request, const SimpleResponse* response) {}
     55 
     56 void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
     57                             const SimpleRequest* request,
     58                             const SimpleResponse* response) {
     59   const grpc_compression_algorithm received_compression =
     60       inspector.GetCallCompressionAlgorithm();
     61   if (request->response_compressed().value()) {
     62     if (received_compression == GRPC_COMPRESS_NONE) {
     63       // Requested some compression, got NONE. This is an error.
     64       gpr_log(GPR_ERROR,
     65               "Failure: Requested compression but got uncompressed response "
     66               "from server.");
     67       abort();
     68     }
     69     GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
     70   } else {
     71     // Didn't request compression -> make sure the response is uncompressed
     72     GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
     73   }
     74 }
     75 }  // namespace
     76 
     77 InteropClient::ServiceStub::ServiceStub(
     78     ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
     79     : channel_creation_func_(channel_creation_func),
     80       channel_(channel_creation_func_()),
     81       new_stub_every_call_(new_stub_every_call) {
     82   // If new_stub_every_call is false, then this is our chance to initialize
     83   // stub_. (see Get())
     84   if (!new_stub_every_call) {
     85     stub_ = TestService::NewStub(channel_);
     86   }
     87 }
     88 
     89 TestService::Stub* InteropClient::ServiceStub::Get() {
     90   if (new_stub_every_call_) {
     91     stub_ = TestService::NewStub(channel_);
     92   }
     93 
     94   return stub_.get();
     95 }
     96 
     97 UnimplementedService::Stub*
     98 InteropClient::ServiceStub::GetUnimplementedServiceStub() {
     99   if (unimplemented_service_stub_ == nullptr) {
    100     unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
    101   }
    102   return unimplemented_service_stub_.get();
    103 }
    104 
    105 void InteropClient::ServiceStub::ResetChannel() {
    106   channel_ = channel_creation_func_();
    107   if (!new_stub_every_call_) {
    108     stub_ = TestService::NewStub(channel_);
    109   }
    110 }
    111 
    112 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
    113                              bool new_stub_every_test_case,
    114                              bool do_not_abort_on_transient_failures)
    115     : serviceStub_(channel_creation_func, new_stub_every_test_case),
    116       do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
    117 
    118 bool InteropClient::AssertStatusOk(const Status& s,
    119                                    const grpc::string& optional_debug_string) {
    120   if (s.ok()) {
    121     return true;
    122   }
    123 
    124   // Note: At this point, s.error_code is definitely not StatusCode::OK (we
    125   // already checked for s.ok() above). So, the following will call abort()
    126   // (unless s.error_code() corresponds to a transient failure and
    127   // 'do_not_abort_on_transient_failures' is true)
    128   return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
    129 }
    130 
    131 bool InteropClient::AssertStatusCode(
    132     const Status& s, StatusCode expected_code,
    133     const grpc::string& optional_debug_string) {
    134   if (s.error_code() == expected_code) {
    135     return true;
    136   }
    137 
    138   gpr_log(GPR_ERROR,
    139           "Error status code: %d (expected: %d), message: %s,"
    140           " debug string: %s",
    141           s.error_code(), expected_code, s.error_message().c_str(),
    142           optional_debug_string.c_str());
    143 
    144   // In case of transient transient/retryable failures (like a broken
    145   // connection) we may or may not abort (see TransientFailureOrAbort())
    146   if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
    147     return TransientFailureOrAbort();
    148   }
    149 
    150   abort();
    151 }
    152 
    153 bool InteropClient::DoEmpty() {
    154   gpr_log(GPR_DEBUG, "Sending an empty rpc...");
    155 
    156   Empty request;
    157   Empty response;
    158   ClientContext context;
    159 
    160   Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
    161 
    162   if (!AssertStatusOk(s, context.debug_error_string())) {
    163     return false;
    164   }
    165 
    166   gpr_log(GPR_DEBUG, "Empty rpc done.");
    167   return true;
    168 }
    169 
    170 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
    171                                       SimpleResponse* response) {
    172   return PerformLargeUnary(request, response, NoopChecks);
    173 }
    174 
    175 bool InteropClient::PerformLargeUnary(SimpleRequest* request,
    176                                       SimpleResponse* response,
    177                                       const CheckerFn& custom_checks_fn) {
    178   ClientContext context;
    179   InteropClientContextInspector inspector(context);
    180   request->set_response_size(kLargeResponseSize);
    181   grpc::string payload(kLargeRequestSize, '\0');
    182   request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
    183   if (request->has_expect_compressed()) {
    184     if (request->expect_compressed().value()) {
    185       context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
    186     } else {
    187       context.set_compression_algorithm(GRPC_COMPRESS_NONE);
    188     }
    189   }
    190 
    191   Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
    192   if (!AssertStatusOk(s, context.debug_error_string())) {
    193     return false;
    194   }
    195 
    196   custom_checks_fn(inspector, request, response);
    197 
    198   // Payload related checks.
    199   GPR_ASSERT(response->payload().body() ==
    200              grpc::string(kLargeResponseSize, '\0'));
    201   return true;
    202 }
    203 
    204 bool InteropClient::DoComputeEngineCreds(
    205     const grpc::string& default_service_account,
    206     const grpc::string& oauth_scope) {
    207   gpr_log(GPR_DEBUG,
    208           "Sending a large unary rpc with compute engine credentials ...");
    209   SimpleRequest request;
    210   SimpleResponse response;
    211   request.set_fill_username(true);
    212   request.set_fill_oauth_scope(true);
    213 
    214   if (!PerformLargeUnary(&request, &response)) {
    215     return false;
    216   }
    217 
    218   gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
    219   gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
    220   GPR_ASSERT(!response.username().empty());
    221   GPR_ASSERT(response.username().c_str() == default_service_account);
    222   GPR_ASSERT(!response.oauth_scope().empty());
    223   const char* oauth_scope_str = response.oauth_scope().c_str();
    224   GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
    225   gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
    226   return true;
    227 }
    228 
    229 bool InteropClient::DoOauth2AuthToken(const grpc::string& username,
    230                                       const grpc::string& oauth_scope) {
    231   gpr_log(GPR_DEBUG,
    232           "Sending a unary rpc with raw oauth2 access token credentials ...");
    233   SimpleRequest request;
    234   SimpleResponse response;
    235   request.set_fill_username(true);
    236   request.set_fill_oauth_scope(true);
    237 
    238   ClientContext context;
    239 
    240   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
    241 
    242   if (!AssertStatusOk(s, context.debug_error_string())) {
    243     return false;
    244   }
    245 
    246   GPR_ASSERT(!response.username().empty());
    247   GPR_ASSERT(!response.oauth_scope().empty());
    248   GPR_ASSERT(username == response.username());
    249   const char* oauth_scope_str = response.oauth_scope().c_str();
    250   GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
    251   gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
    252   return true;
    253 }
    254 
    255 bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
    256   gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
    257   SimpleRequest request;
    258   SimpleResponse response;
    259   request.set_fill_username(true);
    260 
    261   ClientContext context;
    262   std::chrono::seconds token_lifetime = std::chrono::hours(1);
    263   std::shared_ptr<CallCredentials> creds =
    264       ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
    265 
    266   context.set_credentials(creds);
    267 
    268   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
    269 
    270   if (!AssertStatusOk(s, context.debug_error_string())) {
    271     return false;
    272   }
    273 
    274   GPR_ASSERT(!response.username().empty());
    275   GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
    276   gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
    277   return true;
    278 }
    279 
    280 bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
    281   gpr_log(GPR_DEBUG,
    282           "Sending a large unary rpc with JWT token credentials ...");
    283   SimpleRequest request;
    284   SimpleResponse response;
    285   request.set_fill_username(true);
    286 
    287   if (!PerformLargeUnary(&request, &response)) {
    288     return false;
    289   }
    290 
    291   GPR_ASSERT(!response.username().empty());
    292   GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
    293   gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
    294   return true;
    295 }
    296 
    297 bool InteropClient::DoLargeUnary() {
    298   gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
    299   SimpleRequest request;
    300   SimpleResponse response;
    301   if (!PerformLargeUnary(&request, &response)) {
    302     return false;
    303   }
    304   gpr_log(GPR_DEBUG, "Large unary done.");
    305   return true;
    306 }
    307 
    308 bool InteropClient::DoClientCompressedUnary() {
    309   // Probing for compression-checks support.
    310   ClientContext probe_context;
    311   SimpleRequest probe_req;
    312   SimpleResponse probe_res;
    313 
    314   probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
    315   probe_req.mutable_expect_compressed()->set_value(true);  // lies!
    316 
    317   probe_req.set_response_size(kLargeResponseSize);
    318   probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0'));
    319 
    320   gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
    321   const Status s =
    322       serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
    323   if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
    324     // The server isn't able to evaluate incoming compression, making the rest
    325     // of this test moot.
    326     gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
    327     return false;
    328   }
    329   gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
    330 
    331   const std::vector<bool> compressions = {true, false};
    332   for (size_t i = 0; i < compressions.size(); i++) {
    333     char* log_suffix;
    334     gpr_asprintf(&log_suffix, "(compression=%s)",
    335                  compressions[i] ? "true" : "false");
    336 
    337     gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix);
    338     SimpleRequest request;
    339     SimpleResponse response;
    340     request.mutable_expect_compressed()->set_value(compressions[i]);
    341     if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
    342       gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix);
    343       gpr_free(log_suffix);
    344       return false;
    345     }
    346 
    347     gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix);
    348     gpr_free(log_suffix);
    349   }
    350 
    351   return true;
    352 }
    353 
    354 bool InteropClient::DoServerCompressedUnary() {
    355   const std::vector<bool> compressions = {true, false};
    356   for (size_t i = 0; i < compressions.size(); i++) {
    357     char* log_suffix;
    358     gpr_asprintf(&log_suffix, "(compression=%s)",
    359                  compressions[i] ? "true" : "false");
    360 
    361     gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
    362             log_suffix);
    363     SimpleRequest request;
    364     SimpleResponse response;
    365     request.mutable_response_compressed()->set_value(compressions[i]);
    366 
    367     if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
    368       gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix);
    369       gpr_free(log_suffix);
    370       return false;
    371     }
    372 
    373     gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix);
    374     gpr_free(log_suffix);
    375   }
    376 
    377   return true;
    378 }
    379 
    380 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
    381 // false
    382 bool InteropClient::TransientFailureOrAbort() {
    383   if (do_not_abort_on_transient_failures_) {
    384     return false;
    385   }
    386 
    387   abort();
    388 }
    389 
    390 bool InteropClient::DoRequestStreaming() {
    391   gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
    392 
    393   ClientContext context;
    394   StreamingInputCallRequest request;
    395   StreamingInputCallResponse response;
    396 
    397   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
    398       serviceStub_.Get()->StreamingInputCall(&context, &response));
    399 
    400   int aggregated_payload_size = 0;
    401   for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
    402     Payload* payload = request.mutable_payload();
    403     payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
    404     if (!stream->Write(request)) {
    405       gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
    406       return TransientFailureOrAbort();
    407     }
    408     aggregated_payload_size += request_stream_sizes[i];
    409   }
    410   GPR_ASSERT(stream->WritesDone());
    411 
    412   Status s = stream->Finish();
    413   if (!AssertStatusOk(s, context.debug_error_string())) {
    414     return false;
    415   }
    416 
    417   GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
    418   return true;
    419 }
    420 
    421 bool InteropClient::DoResponseStreaming() {
    422   gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
    423 
    424   ClientContext context;
    425   StreamingOutputCallRequest request;
    426   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
    427     ResponseParameters* response_parameter = request.add_response_parameters();
    428     response_parameter->set_size(response_stream_sizes[i]);
    429   }
    430   StreamingOutputCallResponse response;
    431   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
    432       serviceStub_.Get()->StreamingOutputCall(&context, request));
    433 
    434   unsigned int i = 0;
    435   while (stream->Read(&response)) {
    436     GPR_ASSERT(response.payload().body() ==
    437                grpc::string(response_stream_sizes[i], '\0'));
    438     ++i;
    439   }
    440 
    441   if (i < response_stream_sizes.size()) {
    442     // stream->Read() failed before reading all the expected messages. This is
    443     // most likely due to connection failure.
    444     gpr_log(GPR_ERROR,
    445             "DoResponseStreaming(): Read fewer streams (%d) than "
    446             "response_stream_sizes.size() (%" PRIuPTR ")",
    447             i, response_stream_sizes.size());
    448     return TransientFailureOrAbort();
    449   }
    450 
    451   Status s = stream->Finish();
    452   if (!AssertStatusOk(s, context.debug_error_string())) {
    453     return false;
    454   }
    455 
    456   gpr_log(GPR_DEBUG, "Response streaming done.");
    457   return true;
    458 }
    459 
    460 bool InteropClient::DoClientCompressedStreaming() {
    461   // Probing for compression-checks support.
    462   ClientContext probe_context;
    463   StreamingInputCallRequest probe_req;
    464   StreamingInputCallResponse probe_res;
    465 
    466   probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
    467   probe_req.mutable_expect_compressed()->set_value(true);  // lies!
    468   probe_req.mutable_payload()->set_body(grpc::string(27182, '\0'));
    469 
    470   gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
    471 
    472   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
    473       serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
    474 
    475   if (!probe_stream->Write(probe_req)) {
    476     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
    477     return TransientFailureOrAbort();
    478   }
    479   Status s = probe_stream->Finish();
    480   if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
    481     // The server isn't able to evaluate incoming compression, making the rest
    482     // of this test moot.
    483     gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
    484     return false;
    485   }
    486   gpr_log(GPR_DEBUG,
    487           "Compressed streaming request probe succeeded. Proceeding.");
    488 
    489   ClientContext context;
    490   StreamingInputCallRequest request;
    491   StreamingInputCallResponse response;
    492 
    493   context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
    494   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
    495       serviceStub_.Get()->StreamingInputCall(&context, &response));
    496 
    497   request.mutable_payload()->set_body(grpc::string(27182, '\0'));
    498   request.mutable_expect_compressed()->set_value(true);
    499   gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
    500   if (!stream->Write(request)) {
    501     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
    502     return TransientFailureOrAbort();
    503   }
    504 
    505   WriteOptions wopts;
    506   wopts.set_no_compression();
    507   request.mutable_payload()->set_body(grpc::string(45904, '\0'));
    508   request.mutable_expect_compressed()->set_value(false);
    509   gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
    510   if (!stream->Write(request, wopts)) {
    511     gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
    512     return TransientFailureOrAbort();
    513   }
    514   GPR_ASSERT(stream->WritesDone());
    515 
    516   s = stream->Finish();
    517   if (!AssertStatusOk(s, context.debug_error_string())) {
    518     return false;
    519   }
    520 
    521   return true;
    522 }
    523 
    524 bool InteropClient::DoServerCompressedStreaming() {
    525   const std::vector<bool> compressions = {true, false};
    526   const std::vector<int> sizes = {31415, 92653};
    527 
    528   ClientContext context;
    529   InteropClientContextInspector inspector(context);
    530   StreamingOutputCallRequest request;
    531 
    532   GPR_ASSERT(compressions.size() == sizes.size());
    533   for (size_t i = 0; i < sizes.size(); i++) {
    534     char* log_suffix;
    535     gpr_asprintf(&log_suffix, "(compression=%s; size=%d)",
    536                  compressions[i] ? "true" : "false", sizes[i]);
    537 
    538     gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix);
    539     gpr_free(log_suffix);
    540 
    541     ResponseParameters* const response_parameter =
    542         request.add_response_parameters();
    543     response_parameter->mutable_compressed()->set_value(compressions[i]);
    544     response_parameter->set_size(sizes[i]);
    545   }
    546   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
    547       serviceStub_.Get()->StreamingOutputCall(&context, request));
    548 
    549   size_t k = 0;
    550   StreamingOutputCallResponse response;
    551   while (stream->Read(&response)) {
    552     // Payload size checks.
    553     GPR_ASSERT(response.payload().body() ==
    554                grpc::string(request.response_parameters(k).size(), '\0'));
    555 
    556     // Compression checks.
    557     GPR_ASSERT(request.response_parameters(k).has_compressed());
    558     if (request.response_parameters(k).compressed().value()) {
    559       GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
    560       GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
    561     } else {
    562       // requested *no* compression.
    563       GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
    564     }
    565     ++k;
    566   }
    567 
    568   if (k < sizes.size()) {
    569     // stream->Read() failed before reading all the expected messages. This
    570     // is most likely due to a connection failure.
    571     gpr_log(GPR_ERROR,
    572             "%s(): Responses read (k=%" PRIuPTR
    573             ") is less than the expected number of  messages (%" PRIuPTR ").",
    574             __func__, k, sizes.size());
    575     return TransientFailureOrAbort();
    576   }
    577 
    578   Status s = stream->Finish();
    579   if (!AssertStatusOk(s, context.debug_error_string())) {
    580     return false;
    581   }
    582   return true;
    583 }
    584 
    585 bool InteropClient::DoResponseStreamingWithSlowConsumer() {
    586   gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
    587 
    588   ClientContext context;
    589   StreamingOutputCallRequest request;
    590 
    591   for (int i = 0; i < kNumResponseMessages; ++i) {
    592     ResponseParameters* response_parameter = request.add_response_parameters();
    593     response_parameter->set_size(kResponseMessageSize);
    594   }
    595   StreamingOutputCallResponse response;
    596   std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
    597       serviceStub_.Get()->StreamingOutputCall(&context, request));
    598 
    599   int i = 0;
    600   while (stream->Read(&response)) {
    601     GPR_ASSERT(response.payload().body() ==
    602                grpc::string(kResponseMessageSize, '\0'));
    603     gpr_log(GPR_DEBUG, "received message %d", i);
    604     gpr_sleep_until(gpr_time_add(
    605         gpr_now(GPR_CLOCK_REALTIME),
    606         gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
    607     ++i;
    608   }
    609 
    610   if (i < kNumResponseMessages) {
    611     gpr_log(GPR_ERROR,
    612             "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
    613             "less than the expected messages (i.e kNumResponseMessages = %d)",
    614             i, kNumResponseMessages);
    615 
    616     return TransientFailureOrAbort();
    617   }
    618 
    619   Status s = stream->Finish();
    620   if (!AssertStatusOk(s, context.debug_error_string())) {
    621     return false;
    622   }
    623 
    624   gpr_log(GPR_DEBUG, "Response streaming done.");
    625   return true;
    626 }
    627 
    628 bool InteropClient::DoHalfDuplex() {
    629   gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
    630 
    631   ClientContext context;
    632   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    633                                      StreamingOutputCallResponse>>
    634       stream(serviceStub_.Get()->HalfDuplexCall(&context));
    635 
    636   StreamingOutputCallRequest request;
    637   ResponseParameters* response_parameter = request.add_response_parameters();
    638   for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
    639     response_parameter->set_size(response_stream_sizes[i]);
    640 
    641     if (!stream->Write(request)) {
    642       gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
    643       return TransientFailureOrAbort();
    644     }
    645   }
    646   stream->WritesDone();
    647 
    648   unsigned int i = 0;
    649   StreamingOutputCallResponse response;
    650   while (stream->Read(&response)) {
    651     GPR_ASSERT(response.payload().body() ==
    652                grpc::string(response_stream_sizes[i], '\0'));
    653     ++i;
    654   }
    655 
    656   if (i < response_stream_sizes.size()) {
    657     // stream->Read() failed before reading all the expected messages. This is
    658     // most likely due to a connection failure
    659     gpr_log(GPR_ERROR,
    660             "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
    661             "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
    662             i, response_stream_sizes.size());
    663     return TransientFailureOrAbort();
    664   }
    665 
    666   Status s = stream->Finish();
    667   if (!AssertStatusOk(s, context.debug_error_string())) {
    668     return false;
    669   }
    670 
    671   gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
    672   return true;
    673 }
    674 
    675 bool InteropClient::DoPingPong() {
    676   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
    677 
    678   ClientContext context;
    679   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    680                                      StreamingOutputCallResponse>>
    681       stream(serviceStub_.Get()->FullDuplexCall(&context));
    682 
    683   StreamingOutputCallRequest request;
    684   ResponseParameters* response_parameter = request.add_response_parameters();
    685   Payload* payload = request.mutable_payload();
    686   StreamingOutputCallResponse response;
    687 
    688   for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
    689     response_parameter->set_size(response_stream_sizes[i]);
    690     payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
    691 
    692     if (!stream->Write(request)) {
    693       gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
    694       return TransientFailureOrAbort();
    695     }
    696 
    697     if (!stream->Read(&response)) {
    698       gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
    699       return TransientFailureOrAbort();
    700     }
    701 
    702     GPR_ASSERT(response.payload().body() ==
    703                grpc::string(response_stream_sizes[i], '\0'));
    704   }
    705 
    706   stream->WritesDone();
    707 
    708   GPR_ASSERT(!stream->Read(&response));
    709 
    710   Status s = stream->Finish();
    711   if (!AssertStatusOk(s, context.debug_error_string())) {
    712     return false;
    713   }
    714 
    715   gpr_log(GPR_DEBUG, "Ping pong streaming done.");
    716   return true;
    717 }
    718 
    719 bool InteropClient::DoCancelAfterBegin() {
    720   gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
    721 
    722   ClientContext context;
    723   StreamingInputCallRequest request;
    724   StreamingInputCallResponse response;
    725 
    726   std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
    727       serviceStub_.Get()->StreamingInputCall(&context, &response));
    728 
    729   gpr_log(GPR_DEBUG, "Trying to cancel...");
    730   context.TryCancel();
    731   Status s = stream->Finish();
    732 
    733   if (!AssertStatusCode(s, StatusCode::CANCELLED,
    734                         context.debug_error_string())) {
    735     return false;
    736   }
    737 
    738   gpr_log(GPR_DEBUG, "Canceling streaming done.");
    739   return true;
    740 }
    741 
    742 bool InteropClient::DoCancelAfterFirstResponse() {
    743   gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
    744 
    745   ClientContext context;
    746   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    747                                      StreamingOutputCallResponse>>
    748       stream(serviceStub_.Get()->FullDuplexCall(&context));
    749 
    750   StreamingOutputCallRequest request;
    751   ResponseParameters* response_parameter = request.add_response_parameters();
    752   response_parameter->set_size(31415);
    753   request.mutable_payload()->set_body(grpc::string(27182, '\0'));
    754   StreamingOutputCallResponse response;
    755 
    756   if (!stream->Write(request)) {
    757     gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
    758     return TransientFailureOrAbort();
    759   }
    760 
    761   if (!stream->Read(&response)) {
    762     gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
    763     return TransientFailureOrAbort();
    764   }
    765   GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
    766 
    767   gpr_log(GPR_DEBUG, "Trying to cancel...");
    768   context.TryCancel();
    769 
    770   Status s = stream->Finish();
    771   gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
    772   return true;
    773 }
    774 
    775 bool InteropClient::DoTimeoutOnSleepingServer() {
    776   gpr_log(GPR_DEBUG,
    777           "Sending Ping Pong streaming rpc with a short deadline...");
    778 
    779   ClientContext context;
    780   std::chrono::system_clock::time_point deadline =
    781       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
    782   context.set_deadline(deadline);
    783   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    784                                      StreamingOutputCallResponse>>
    785       stream(serviceStub_.Get()->FullDuplexCall(&context));
    786 
    787   StreamingOutputCallRequest request;
    788   request.mutable_payload()->set_body(grpc::string(27182, '\0'));
    789   stream->Write(request);
    790 
    791   Status s = stream->Finish();
    792   if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
    793                         context.debug_error_string())) {
    794     return false;
    795   }
    796 
    797   gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
    798   return true;
    799 }
    800 
    801 bool InteropClient::DoEmptyStream() {
    802   gpr_log(GPR_DEBUG, "Starting empty_stream.");
    803 
    804   ClientContext context;
    805   std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    806                                      StreamingOutputCallResponse>>
    807       stream(serviceStub_.Get()->FullDuplexCall(&context));
    808   stream->WritesDone();
    809   StreamingOutputCallResponse response;
    810   GPR_ASSERT(stream->Read(&response) == false);
    811 
    812   Status s = stream->Finish();
    813   if (!AssertStatusOk(s, context.debug_error_string())) {
    814     return false;
    815   }
    816 
    817   gpr_log(GPR_DEBUG, "empty_stream done.");
    818   return true;
    819 }
    820 
    821 bool InteropClient::DoStatusWithMessage() {
    822   gpr_log(GPR_DEBUG,
    823           "Sending RPC with a request for status code 2 and message");
    824 
    825   const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
    826   const grpc::string test_msg = "This is a test message";
    827 
    828   // Test UnaryCall.
    829   ClientContext context;
    830   SimpleRequest request;
    831   SimpleResponse response;
    832   EchoStatus* requested_status = request.mutable_response_status();
    833   requested_status->set_code(test_code);
    834   requested_status->set_message(test_msg);
    835   Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
    836   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
    837                         context.debug_error_string())) {
    838     return false;
    839   }
    840   GPR_ASSERT(s.error_message() == test_msg);
    841 
    842   // Test FullDuplexCall.
    843   ClientContext stream_context;
    844   std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    845                                      StreamingOutputCallResponse>>
    846       stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
    847   StreamingOutputCallRequest streaming_request;
    848   requested_status = streaming_request.mutable_response_status();
    849   requested_status->set_code(test_code);
    850   requested_status->set_message(test_msg);
    851   stream->Write(streaming_request);
    852   stream->WritesDone();
    853   StreamingOutputCallResponse streaming_response;
    854   while (stream->Read(&streaming_response))
    855     ;
    856   s = stream->Finish();
    857   if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
    858                         context.debug_error_string())) {
    859     return false;
    860   }
    861   GPR_ASSERT(s.error_message() == test_msg);
    862 
    863   gpr_log(GPR_DEBUG, "Done testing Status and Message");
    864   return true;
    865 }
    866 
    867 bool InteropClient::DoCacheableUnary() {
    868   gpr_log(GPR_DEBUG, "Sending RPC with cacheable response");
    869 
    870   // Create request with current timestamp
    871   gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
    872   std::string timestamp =
    873       std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
    874   SimpleRequest request;
    875   request.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
    876 
    877   // Request 1
    878   ClientContext context1;
    879   SimpleResponse response1;
    880   context1.set_cacheable(true);
    881   // Add fake user IP since some proxy's (GFE) won't cache requests from
    882   // localhost.
    883   context1.AddMetadata("x-user-ip", "1.2.3.4");
    884   Status s1 =
    885       serviceStub_.Get()->CacheableUnaryCall(&context1, request, &response1);
    886   if (!AssertStatusOk(s1, context1.debug_error_string())) {
    887     return false;
    888   }
    889   gpr_log(GPR_DEBUG, "response 1 payload: %s",
    890           response1.payload().body().c_str());
    891 
    892   // Request 2
    893   ClientContext context2;
    894   SimpleResponse response2;
    895   context2.set_cacheable(true);
    896   context2.AddMetadata("x-user-ip", "1.2.3.4");
    897   Status s2 =
    898       serviceStub_.Get()->CacheableUnaryCall(&context2, request, &response2);
    899   if (!AssertStatusOk(s2, context2.debug_error_string())) {
    900     return false;
    901   }
    902   gpr_log(GPR_DEBUG, "response 2 payload: %s",
    903           response2.payload().body().c_str());
    904 
    905   // Check that the body is same for both requests. It will be the same if the
    906   // second response is a cached copy of the first response
    907   GPR_ASSERT(response2.payload().body() == response1.payload().body());
    908 
    909   // Request 3
    910   // Modify the request body so it will not get a cache hit
    911   ts = gpr_now(GPR_CLOCK_PRECISE);
    912   timestamp = std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
    913   SimpleRequest request1;
    914   request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
    915   ClientContext context3;
    916   SimpleResponse response3;
    917   context3.set_cacheable(true);
    918   context3.AddMetadata("x-user-ip", "1.2.3.4");
    919   Status s3 =
    920       serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3);
    921   if (!AssertStatusOk(s3, context3.debug_error_string())) {
    922     return false;
    923   }
    924   gpr_log(GPR_DEBUG, "response 3 payload: %s",
    925           response3.payload().body().c_str());
    926 
    927   // Check that the response is different from the previous response.
    928   GPR_ASSERT(response3.payload().body() != response1.payload().body());
    929   return true;
    930 }
    931 
    932 bool InteropClient::DoCustomMetadata() {
    933   const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
    934   const grpc::string kInitialMetadataValue("test_initial_metadata_value");
    935   const grpc::string kEchoTrailingBinMetadataKey(
    936       "x-grpc-test-echo-trailing-bin");
    937   const grpc::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
    938   ;
    939 
    940   {
    941     gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
    942     ClientContext context;
    943     context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
    944     context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
    945     SimpleRequest request;
    946     SimpleResponse response;
    947     request.set_response_size(kLargeResponseSize);
    948     grpc::string payload(kLargeRequestSize, '\0');
    949     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
    950 
    951     Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
    952     if (!AssertStatusOk(s, context.debug_error_string())) {
    953       return false;
    954     }
    955 
    956     const auto& server_initial_metadata = context.GetServerInitialMetadata();
    957     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
    958     GPR_ASSERT(iter != server_initial_metadata.end());
    959     GPR_ASSERT(iter->second == kInitialMetadataValue);
    960     const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
    961     iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
    962     GPR_ASSERT(iter != server_trailing_metadata.end());
    963     GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) ==
    964                kTrailingBinValue);
    965 
    966     gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
    967   }
    968 
    969   {
    970     gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
    971     ClientContext context;
    972     context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
    973     context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
    974     std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
    975                                        StreamingOutputCallResponse>>
    976         stream(serviceStub_.Get()->FullDuplexCall(&context));
    977 
    978     StreamingOutputCallRequest request;
    979     ResponseParameters* response_parameter = request.add_response_parameters();
    980     response_parameter->set_size(kLargeResponseSize);
    981     grpc::string payload(kLargeRequestSize, '\0');
    982     request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
    983     StreamingOutputCallResponse response;
    984 
    985     if (!stream->Write(request)) {
    986       gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
    987       return TransientFailureOrAbort();
    988     }
    989 
    990     stream->WritesDone();
    991 
    992     if (!stream->Read(&response)) {
    993       gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
    994       return TransientFailureOrAbort();
    995     }
    996 
    997     GPR_ASSERT(response.payload().body() ==
    998                grpc::string(kLargeResponseSize, '\0'));
    999 
   1000     GPR_ASSERT(!stream->Read(&response));
   1001 
   1002     Status s = stream->Finish();
   1003     if (!AssertStatusOk(s, context.debug_error_string())) {
   1004       return false;
   1005     }
   1006 
   1007     const auto& server_initial_metadata = context.GetServerInitialMetadata();
   1008     auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
   1009     GPR_ASSERT(iter != server_initial_metadata.end());
   1010     GPR_ASSERT(iter->second == kInitialMetadataValue);
   1011     const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
   1012     iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
   1013     GPR_ASSERT(iter != server_trailing_metadata.end());
   1014     GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) ==
   1015                kTrailingBinValue);
   1016 
   1017     gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
   1018   }
   1019 
   1020   return true;
   1021 }
   1022 
   1023 bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) {
   1024   gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
   1025   GPR_ASSERT(soak_iterations > 0);
   1026   SimpleRequest request;
   1027   SimpleResponse response;
   1028   for (int i = 0; i < soak_iterations; ++i) {
   1029     if (!PerformLargeUnary(&request, &response)) {
   1030       gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i);
   1031       return false;
   1032     }
   1033   }
   1034   gpr_log(GPR_DEBUG, "rpc_soak test done.");
   1035   return true;
   1036 }
   1037 
   1038 bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) {
   1039   gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
   1040           soak_iterations);
   1041   GPR_ASSERT(soak_iterations > 0);
   1042   SimpleRequest request;
   1043   SimpleResponse response;
   1044   for (int i = 0; i < soak_iterations; ++i) {
   1045     serviceStub_.ResetChannel();
   1046     if (!PerformLargeUnary(&request, &response)) {
   1047       gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i);
   1048       return false;
   1049     }
   1050   }
   1051   gpr_log(GPR_DEBUG, "channel_soak test done.");
   1052   return true;
   1053 }
   1054 
   1055 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
   1056                                            int32_t iteration_interval) {
   1057   gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
   1058   GPR_ASSERT(soak_iterations > 0);
   1059   GPR_ASSERT(iteration_interval > 0);
   1060   SimpleRequest request;
   1061   SimpleResponse response;
   1062   int num_failures = 0;
   1063   for (int i = 0; i < soak_iterations; ++i) {
   1064     gpr_log(GPR_DEBUG, "Sending RPC number %d...", i);
   1065     if (!PerformLargeUnary(&request, &response)) {
   1066       gpr_log(GPR_ERROR, "Iteration %d failed.", i);
   1067       num_failures++;
   1068     }
   1069     gpr_sleep_until(
   1070         gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
   1071                      gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
   1072   }
   1073   if (num_failures == 0) {
   1074     gpr_log(GPR_DEBUG, "long_lived_channel test done.");
   1075     return true;
   1076   } else {
   1077     gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.",
   1078             num_failures);
   1079     return false;
   1080   }
   1081 }
   1082 
   1083 bool InteropClient::DoUnimplementedService() {
   1084   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
   1085 
   1086   Empty request;
   1087   Empty response;
   1088   ClientContext context;
   1089 
   1090   UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
   1091 
   1092   Status s = stub->UnimplementedCall(&context, request, &response);
   1093 
   1094   if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
   1095                         context.debug_error_string())) {
   1096     return false;
   1097   }
   1098 
   1099   gpr_log(GPR_DEBUG, "unimplemented service done.");
   1100   return true;
   1101 }
   1102 
   1103 bool InteropClient::DoUnimplementedMethod() {
   1104   gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
   1105 
   1106   Empty request;
   1107   Empty response;
   1108   ClientContext context;
   1109 
   1110   Status s =
   1111       serviceStub_.Get()->UnimplementedCall(&context, request, &response);
   1112 
   1113   if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
   1114                         context.debug_error_string())) {
   1115     return false;
   1116   }
   1117 
   1118   gpr_log(GPR_DEBUG, "unimplemented rpc done.");
   1119   return true;
   1120 }
   1121 
   1122 }  // namespace testing
   1123 }  // namespace grpc
   1124