1 /* 2 * 3 * Copyright 2015-2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <cinttypes> 20 #include <fstream> 21 #include <memory> 22 #include <utility> 23 24 #include <grpc/grpc.h> 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 #include <grpc/support/string_util.h> 28 #include <grpc/support/time.h> 29 #include <grpcpp/channel.h> 30 #include <grpcpp/client_context.h> 31 #include <grpcpp/security/credentials.h> 32 33 #include "src/core/lib/transport/byte_stream.h" 34 #include "src/proto/grpc/testing/empty.pb.h" 35 #include "src/proto/grpc/testing/messages.pb.h" 36 #include "src/proto/grpc/testing/test.grpc.pb.h" 37 #include "test/cpp/interop/client_helper.h" 38 #include "test/cpp/interop/interop_client.h" 39 40 namespace grpc { 41 namespace testing { 42 43 namespace { 44 // The same value is defined by the Java client. 45 const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; 46 const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; 47 const int kNumResponseMessages = 2000; 48 const int kResponseMessageSize = 1030; 49 const int kReceiveDelayMilliSeconds = 20; 50 const int kLargeRequestSize = 271828; 51 const int kLargeResponseSize = 314159; 52 53 void NoopChecks(const InteropClientContextInspector& inspector, 54 const SimpleRequest* request, const SimpleResponse* response) {} 55 56 void UnaryCompressionChecks(const InteropClientContextInspector& inspector, 57 const SimpleRequest* request, 58 const SimpleResponse* response) { 59 const grpc_compression_algorithm received_compression = 60 inspector.GetCallCompressionAlgorithm(); 61 if (request->response_compressed().value()) { 62 if (received_compression == GRPC_COMPRESS_NONE) { 63 // Requested some compression, got NONE. This is an error. 64 gpr_log(GPR_ERROR, 65 "Failure: Requested compression but got uncompressed response " 66 "from server."); 67 abort(); 68 } 69 GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); 70 } else { 71 // Didn't request compression -> make sure the response is uncompressed 72 GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); 73 } 74 } 75 } // namespace 76 77 InteropClient::ServiceStub::ServiceStub( 78 ChannelCreationFunc channel_creation_func, bool new_stub_every_call) 79 : channel_creation_func_(channel_creation_func), 80 channel_(channel_creation_func_()), 81 new_stub_every_call_(new_stub_every_call) { 82 // If new_stub_every_call is false, then this is our chance to initialize 83 // stub_. (see Get()) 84 if (!new_stub_every_call) { 85 stub_ = TestService::NewStub(channel_); 86 } 87 } 88 89 TestService::Stub* InteropClient::ServiceStub::Get() { 90 if (new_stub_every_call_) { 91 stub_ = TestService::NewStub(channel_); 92 } 93 94 return stub_.get(); 95 } 96 97 UnimplementedService::Stub* 98 InteropClient::ServiceStub::GetUnimplementedServiceStub() { 99 if (unimplemented_service_stub_ == nullptr) { 100 unimplemented_service_stub_ = UnimplementedService::NewStub(channel_); 101 } 102 return unimplemented_service_stub_.get(); 103 } 104 105 void InteropClient::ServiceStub::ResetChannel() { 106 channel_ = channel_creation_func_(); 107 if (!new_stub_every_call_) { 108 stub_ = TestService::NewStub(channel_); 109 } 110 } 111 112 InteropClient::InteropClient(ChannelCreationFunc channel_creation_func, 113 bool new_stub_every_test_case, 114 bool do_not_abort_on_transient_failures) 115 : serviceStub_(channel_creation_func, new_stub_every_test_case), 116 do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {} 117 118 bool InteropClient::AssertStatusOk(const Status& s, 119 const grpc::string& optional_debug_string) { 120 if (s.ok()) { 121 return true; 122 } 123 124 // Note: At this point, s.error_code is definitely not StatusCode::OK (we 125 // already checked for s.ok() above). So, the following will call abort() 126 // (unless s.error_code() corresponds to a transient failure and 127 // 'do_not_abort_on_transient_failures' is true) 128 return AssertStatusCode(s, StatusCode::OK, optional_debug_string); 129 } 130 131 bool InteropClient::AssertStatusCode( 132 const Status& s, StatusCode expected_code, 133 const grpc::string& optional_debug_string) { 134 if (s.error_code() == expected_code) { 135 return true; 136 } 137 138 gpr_log(GPR_ERROR, 139 "Error status code: %d (expected: %d), message: %s," 140 " debug string: %s", 141 s.error_code(), expected_code, s.error_message().c_str(), 142 optional_debug_string.c_str()); 143 144 // In case of transient transient/retryable failures (like a broken 145 // connection) we may or may not abort (see TransientFailureOrAbort()) 146 if (s.error_code() == grpc::StatusCode::UNAVAILABLE) { 147 return TransientFailureOrAbort(); 148 } 149 150 abort(); 151 } 152 153 bool InteropClient::DoEmpty() { 154 gpr_log(GPR_DEBUG, "Sending an empty rpc..."); 155 156 Empty request; 157 Empty response; 158 ClientContext context; 159 160 Status s = serviceStub_.Get()->EmptyCall(&context, request, &response); 161 162 if (!AssertStatusOk(s, context.debug_error_string())) { 163 return false; 164 } 165 166 gpr_log(GPR_DEBUG, "Empty rpc done."); 167 return true; 168 } 169 170 bool InteropClient::PerformLargeUnary(SimpleRequest* request, 171 SimpleResponse* response) { 172 return PerformLargeUnary(request, response, NoopChecks); 173 } 174 175 bool InteropClient::PerformLargeUnary(SimpleRequest* request, 176 SimpleResponse* response, 177 const CheckerFn& custom_checks_fn) { 178 ClientContext context; 179 InteropClientContextInspector inspector(context); 180 request->set_response_size(kLargeResponseSize); 181 grpc::string payload(kLargeRequestSize, '\0'); 182 request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); 183 if (request->has_expect_compressed()) { 184 if (request->expect_compressed().value()) { 185 context.set_compression_algorithm(GRPC_COMPRESS_GZIP); 186 } else { 187 context.set_compression_algorithm(GRPC_COMPRESS_NONE); 188 } 189 } 190 191 Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); 192 if (!AssertStatusOk(s, context.debug_error_string())) { 193 return false; 194 } 195 196 custom_checks_fn(inspector, request, response); 197 198 // Payload related checks. 199 GPR_ASSERT(response->payload().body() == 200 grpc::string(kLargeResponseSize, '\0')); 201 return true; 202 } 203 204 bool InteropClient::DoComputeEngineCreds( 205 const grpc::string& default_service_account, 206 const grpc::string& oauth_scope) { 207 gpr_log(GPR_DEBUG, 208 "Sending a large unary rpc with compute engine credentials ..."); 209 SimpleRequest request; 210 SimpleResponse response; 211 request.set_fill_username(true); 212 request.set_fill_oauth_scope(true); 213 214 if (!PerformLargeUnary(&request, &response)) { 215 return false; 216 } 217 218 gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str()); 219 gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str()); 220 GPR_ASSERT(!response.username().empty()); 221 GPR_ASSERT(response.username().c_str() == default_service_account); 222 GPR_ASSERT(!response.oauth_scope().empty()); 223 const char* oauth_scope_str = response.oauth_scope().c_str(); 224 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); 225 gpr_log(GPR_DEBUG, "Large unary with compute engine creds done."); 226 return true; 227 } 228 229 bool InteropClient::DoOauth2AuthToken(const grpc::string& username, 230 const grpc::string& oauth_scope) { 231 gpr_log(GPR_DEBUG, 232 "Sending a unary rpc with raw oauth2 access token credentials ..."); 233 SimpleRequest request; 234 SimpleResponse response; 235 request.set_fill_username(true); 236 request.set_fill_oauth_scope(true); 237 238 ClientContext context; 239 240 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); 241 242 if (!AssertStatusOk(s, context.debug_error_string())) { 243 return false; 244 } 245 246 GPR_ASSERT(!response.username().empty()); 247 GPR_ASSERT(!response.oauth_scope().empty()); 248 GPR_ASSERT(username == response.username()); 249 const char* oauth_scope_str = response.oauth_scope().c_str(); 250 GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos); 251 gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done."); 252 return true; 253 } 254 255 bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) { 256 gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ..."); 257 SimpleRequest request; 258 SimpleResponse response; 259 request.set_fill_username(true); 260 261 ClientContext context; 262 std::chrono::seconds token_lifetime = std::chrono::hours(1); 263 std::shared_ptr<CallCredentials> creds = 264 ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count()); 265 266 context.set_credentials(creds); 267 268 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); 269 270 if (!AssertStatusOk(s, context.debug_error_string())) { 271 return false; 272 } 273 274 GPR_ASSERT(!response.username().empty()); 275 GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos); 276 gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done."); 277 return true; 278 } 279 280 bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { 281 gpr_log(GPR_DEBUG, 282 "Sending a large unary rpc with JWT token credentials ..."); 283 SimpleRequest request; 284 SimpleResponse response; 285 request.set_fill_username(true); 286 287 if (!PerformLargeUnary(&request, &response)) { 288 return false; 289 } 290 291 GPR_ASSERT(!response.username().empty()); 292 GPR_ASSERT(username.find(response.username()) != grpc::string::npos); 293 gpr_log(GPR_DEBUG, "Large unary with JWT token creds done."); 294 return true; 295 } 296 297 bool InteropClient::DoLargeUnary() { 298 gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); 299 SimpleRequest request; 300 SimpleResponse response; 301 if (!PerformLargeUnary(&request, &response)) { 302 return false; 303 } 304 gpr_log(GPR_DEBUG, "Large unary done."); 305 return true; 306 } 307 308 bool InteropClient::DoClientCompressedUnary() { 309 // Probing for compression-checks support. 310 ClientContext probe_context; 311 SimpleRequest probe_req; 312 SimpleResponse probe_res; 313 314 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); 315 probe_req.mutable_expect_compressed()->set_value(true); // lies! 316 317 probe_req.set_response_size(kLargeResponseSize); 318 probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0')); 319 320 gpr_log(GPR_DEBUG, "Sending probe for compressed unary request."); 321 const Status s = 322 serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res); 323 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { 324 // The server isn't able to evaluate incoming compression, making the rest 325 // of this test moot. 326 gpr_log(GPR_DEBUG, "Compressed unary request probe failed"); 327 return false; 328 } 329 gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding."); 330 331 const std::vector<bool> compressions = {true, false}; 332 for (size_t i = 0; i < compressions.size(); i++) { 333 char* log_suffix; 334 gpr_asprintf(&log_suffix, "(compression=%s)", 335 compressions[i] ? "true" : "false"); 336 337 gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix); 338 SimpleRequest request; 339 SimpleResponse response; 340 request.mutable_expect_compressed()->set_value(compressions[i]); 341 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { 342 gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix); 343 gpr_free(log_suffix); 344 return false; 345 } 346 347 gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix); 348 gpr_free(log_suffix); 349 } 350 351 return true; 352 } 353 354 bool InteropClient::DoServerCompressedUnary() { 355 const std::vector<bool> compressions = {true, false}; 356 for (size_t i = 0; i < compressions.size(); i++) { 357 char* log_suffix; 358 gpr_asprintf(&log_suffix, "(compression=%s)", 359 compressions[i] ? "true" : "false"); 360 361 gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.", 362 log_suffix); 363 SimpleRequest request; 364 SimpleResponse response; 365 request.mutable_response_compressed()->set_value(compressions[i]); 366 367 if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { 368 gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix); 369 gpr_free(log_suffix); 370 return false; 371 } 372 373 gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix); 374 gpr_free(log_suffix); 375 } 376 377 return true; 378 } 379 380 // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return 381 // false 382 bool InteropClient::TransientFailureOrAbort() { 383 if (do_not_abort_on_transient_failures_) { 384 return false; 385 } 386 387 abort(); 388 } 389 390 bool InteropClient::DoRequestStreaming() { 391 gpr_log(GPR_DEBUG, "Sending request steaming rpc ..."); 392 393 ClientContext context; 394 StreamingInputCallRequest request; 395 StreamingInputCallResponse response; 396 397 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( 398 serviceStub_.Get()->StreamingInputCall(&context, &response)); 399 400 int aggregated_payload_size = 0; 401 for (size_t i = 0; i < request_stream_sizes.size(); ++i) { 402 Payload* payload = request.mutable_payload(); 403 payload->set_body(grpc::string(request_stream_sizes[i], '\0')); 404 if (!stream->Write(request)) { 405 gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed"); 406 return TransientFailureOrAbort(); 407 } 408 aggregated_payload_size += request_stream_sizes[i]; 409 } 410 GPR_ASSERT(stream->WritesDone()); 411 412 Status s = stream->Finish(); 413 if (!AssertStatusOk(s, context.debug_error_string())) { 414 return false; 415 } 416 417 GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); 418 return true; 419 } 420 421 bool InteropClient::DoResponseStreaming() { 422 gpr_log(GPR_DEBUG, "Receiving response streaming rpc ..."); 423 424 ClientContext context; 425 StreamingOutputCallRequest request; 426 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { 427 ResponseParameters* response_parameter = request.add_response_parameters(); 428 response_parameter->set_size(response_stream_sizes[i]); 429 } 430 StreamingOutputCallResponse response; 431 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( 432 serviceStub_.Get()->StreamingOutputCall(&context, request)); 433 434 unsigned int i = 0; 435 while (stream->Read(&response)) { 436 GPR_ASSERT(response.payload().body() == 437 grpc::string(response_stream_sizes[i], '\0')); 438 ++i; 439 } 440 441 if (i < response_stream_sizes.size()) { 442 // stream->Read() failed before reading all the expected messages. This is 443 // most likely due to connection failure. 444 gpr_log(GPR_ERROR, 445 "DoResponseStreaming(): Read fewer streams (%d) than " 446 "response_stream_sizes.size() (%" PRIuPTR ")", 447 i, response_stream_sizes.size()); 448 return TransientFailureOrAbort(); 449 } 450 451 Status s = stream->Finish(); 452 if (!AssertStatusOk(s, context.debug_error_string())) { 453 return false; 454 } 455 456 gpr_log(GPR_DEBUG, "Response streaming done."); 457 return true; 458 } 459 460 bool InteropClient::DoClientCompressedStreaming() { 461 // Probing for compression-checks support. 462 ClientContext probe_context; 463 StreamingInputCallRequest probe_req; 464 StreamingInputCallResponse probe_res; 465 466 probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); 467 probe_req.mutable_expect_compressed()->set_value(true); // lies! 468 probe_req.mutable_payload()->set_body(grpc::string(27182, '\0')); 469 470 gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request."); 471 472 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream( 473 serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res)); 474 475 if (!probe_stream->Write(probe_req)) { 476 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); 477 return TransientFailureOrAbort(); 478 } 479 Status s = probe_stream->Finish(); 480 if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { 481 // The server isn't able to evaluate incoming compression, making the rest 482 // of this test moot. 483 gpr_log(GPR_DEBUG, "Compressed streaming request probe failed"); 484 return false; 485 } 486 gpr_log(GPR_DEBUG, 487 "Compressed streaming request probe succeeded. Proceeding."); 488 489 ClientContext context; 490 StreamingInputCallRequest request; 491 StreamingInputCallResponse response; 492 493 context.set_compression_algorithm(GRPC_COMPRESS_GZIP); 494 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( 495 serviceStub_.Get()->StreamingInputCall(&context, &response)); 496 497 request.mutable_payload()->set_body(grpc::string(27182, '\0')); 498 request.mutable_expect_compressed()->set_value(true); 499 gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled"); 500 if (!stream->Write(request)) { 501 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); 502 return TransientFailureOrAbort(); 503 } 504 505 WriteOptions wopts; 506 wopts.set_no_compression(); 507 request.mutable_payload()->set_body(grpc::string(45904, '\0')); 508 request.mutable_expect_compressed()->set_value(false); 509 gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled"); 510 if (!stream->Write(request, wopts)) { 511 gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); 512 return TransientFailureOrAbort(); 513 } 514 GPR_ASSERT(stream->WritesDone()); 515 516 s = stream->Finish(); 517 if (!AssertStatusOk(s, context.debug_error_string())) { 518 return false; 519 } 520 521 return true; 522 } 523 524 bool InteropClient::DoServerCompressedStreaming() { 525 const std::vector<bool> compressions = {true, false}; 526 const std::vector<int> sizes = {31415, 92653}; 527 528 ClientContext context; 529 InteropClientContextInspector inspector(context); 530 StreamingOutputCallRequest request; 531 532 GPR_ASSERT(compressions.size() == sizes.size()); 533 for (size_t i = 0; i < sizes.size(); i++) { 534 char* log_suffix; 535 gpr_asprintf(&log_suffix, "(compression=%s; size=%d)", 536 compressions[i] ? "true" : "false", sizes[i]); 537 538 gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix); 539 gpr_free(log_suffix); 540 541 ResponseParameters* const response_parameter = 542 request.add_response_parameters(); 543 response_parameter->mutable_compressed()->set_value(compressions[i]); 544 response_parameter->set_size(sizes[i]); 545 } 546 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( 547 serviceStub_.Get()->StreamingOutputCall(&context, request)); 548 549 size_t k = 0; 550 StreamingOutputCallResponse response; 551 while (stream->Read(&response)) { 552 // Payload size checks. 553 GPR_ASSERT(response.payload().body() == 554 grpc::string(request.response_parameters(k).size(), '\0')); 555 556 // Compression checks. 557 GPR_ASSERT(request.response_parameters(k).has_compressed()); 558 if (request.response_parameters(k).compressed().value()) { 559 GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); 560 GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); 561 } else { 562 // requested *no* compression. 563 GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); 564 } 565 ++k; 566 } 567 568 if (k < sizes.size()) { 569 // stream->Read() failed before reading all the expected messages. This 570 // is most likely due to a connection failure. 571 gpr_log(GPR_ERROR, 572 "%s(): Responses read (k=%" PRIuPTR 573 ") is less than the expected number of messages (%" PRIuPTR ").", 574 __func__, k, sizes.size()); 575 return TransientFailureOrAbort(); 576 } 577 578 Status s = stream->Finish(); 579 if (!AssertStatusOk(s, context.debug_error_string())) { 580 return false; 581 } 582 return true; 583 } 584 585 bool InteropClient::DoResponseStreamingWithSlowConsumer() { 586 gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ..."); 587 588 ClientContext context; 589 StreamingOutputCallRequest request; 590 591 for (int i = 0; i < kNumResponseMessages; ++i) { 592 ResponseParameters* response_parameter = request.add_response_parameters(); 593 response_parameter->set_size(kResponseMessageSize); 594 } 595 StreamingOutputCallResponse response; 596 std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( 597 serviceStub_.Get()->StreamingOutputCall(&context, request)); 598 599 int i = 0; 600 while (stream->Read(&response)) { 601 GPR_ASSERT(response.payload().body() == 602 grpc::string(kResponseMessageSize, '\0')); 603 gpr_log(GPR_DEBUG, "received message %d", i); 604 gpr_sleep_until(gpr_time_add( 605 gpr_now(GPR_CLOCK_REALTIME), 606 gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN))); 607 ++i; 608 } 609 610 if (i < kNumResponseMessages) { 611 gpr_log(GPR_ERROR, 612 "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is " 613 "less than the expected messages (i.e kNumResponseMessages = %d)", 614 i, kNumResponseMessages); 615 616 return TransientFailureOrAbort(); 617 } 618 619 Status s = stream->Finish(); 620 if (!AssertStatusOk(s, context.debug_error_string())) { 621 return false; 622 } 623 624 gpr_log(GPR_DEBUG, "Response streaming done."); 625 return true; 626 } 627 628 bool InteropClient::DoHalfDuplex() { 629 gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ..."); 630 631 ClientContext context; 632 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, 633 StreamingOutputCallResponse>> 634 stream(serviceStub_.Get()->HalfDuplexCall(&context)); 635 636 StreamingOutputCallRequest request; 637 ResponseParameters* response_parameter = request.add_response_parameters(); 638 for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { 639 response_parameter->set_size(response_stream_sizes[i]); 640 641 if (!stream->Write(request)) { 642 gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i); 643 return TransientFailureOrAbort(); 644 } 645 } 646 stream->WritesDone(); 647 648 unsigned int i = 0; 649 StreamingOutputCallResponse response; 650 while (stream->Read(&response)) { 651 GPR_ASSERT(response.payload().body() == 652 grpc::string(response_stream_sizes[i], '\0')); 653 ++i; 654 } 655 656 if (i < response_stream_sizes.size()) { 657 // stream->Read() failed before reading all the expected messages. This is 658 // most likely due to a connection failure 659 gpr_log(GPR_ERROR, 660 "DoHalfDuplex(): Responses read (i=%d) are less than the expected " 661 "number of messages response_stream_sizes.size() (%" PRIuPTR ")", 662 i, response_stream_sizes.size()); 663 return TransientFailureOrAbort(); 664 } 665 666 Status s = stream->Finish(); 667 if (!AssertStatusOk(s, context.debug_error_string())) { 668 return false; 669 } 670 671 gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done."); 672 return true; 673 } 674 675 bool InteropClient::DoPingPong() { 676 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ..."); 677 678 ClientContext context; 679 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, 680 StreamingOutputCallResponse>> 681 stream(serviceStub_.Get()->FullDuplexCall(&context)); 682 683 StreamingOutputCallRequest request; 684 ResponseParameters* response_parameter = request.add_response_parameters(); 685 Payload* payload = request.mutable_payload(); 686 StreamingOutputCallResponse response; 687 688 for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { 689 response_parameter->set_size(response_stream_sizes[i]); 690 payload->set_body(grpc::string(request_stream_sizes[i], '\0')); 691 692 if (!stream->Write(request)) { 693 gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i); 694 return TransientFailureOrAbort(); 695 } 696 697 if (!stream->Read(&response)) { 698 gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i); 699 return TransientFailureOrAbort(); 700 } 701 702 GPR_ASSERT(response.payload().body() == 703 grpc::string(response_stream_sizes[i], '\0')); 704 } 705 706 stream->WritesDone(); 707 708 GPR_ASSERT(!stream->Read(&response)); 709 710 Status s = stream->Finish(); 711 if (!AssertStatusOk(s, context.debug_error_string())) { 712 return false; 713 } 714 715 gpr_log(GPR_DEBUG, "Ping pong streaming done."); 716 return true; 717 } 718 719 bool InteropClient::DoCancelAfterBegin() { 720 gpr_log(GPR_DEBUG, "Sending request streaming rpc ..."); 721 722 ClientContext context; 723 StreamingInputCallRequest request; 724 StreamingInputCallResponse response; 725 726 std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( 727 serviceStub_.Get()->StreamingInputCall(&context, &response)); 728 729 gpr_log(GPR_DEBUG, "Trying to cancel..."); 730 context.TryCancel(); 731 Status s = stream->Finish(); 732 733 if (!AssertStatusCode(s, StatusCode::CANCELLED, 734 context.debug_error_string())) { 735 return false; 736 } 737 738 gpr_log(GPR_DEBUG, "Canceling streaming done."); 739 return true; 740 } 741 742 bool InteropClient::DoCancelAfterFirstResponse() { 743 gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ..."); 744 745 ClientContext context; 746 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, 747 StreamingOutputCallResponse>> 748 stream(serviceStub_.Get()->FullDuplexCall(&context)); 749 750 StreamingOutputCallRequest request; 751 ResponseParameters* response_parameter = request.add_response_parameters(); 752 response_parameter->set_size(31415); 753 request.mutable_payload()->set_body(grpc::string(27182, '\0')); 754 StreamingOutputCallResponse response; 755 756 if (!stream->Write(request)) { 757 gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed"); 758 return TransientFailureOrAbort(); 759 } 760 761 if (!stream->Read(&response)) { 762 gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed"); 763 return TransientFailureOrAbort(); 764 } 765 GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0')); 766 767 gpr_log(GPR_DEBUG, "Trying to cancel..."); 768 context.TryCancel(); 769 770 Status s = stream->Finish(); 771 gpr_log(GPR_DEBUG, "Canceling pingpong streaming done."); 772 return true; 773 } 774 775 bool InteropClient::DoTimeoutOnSleepingServer() { 776 gpr_log(GPR_DEBUG, 777 "Sending Ping Pong streaming rpc with a short deadline..."); 778 779 ClientContext context; 780 std::chrono::system_clock::time_point deadline = 781 std::chrono::system_clock::now() + std::chrono::milliseconds(1); 782 context.set_deadline(deadline); 783 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, 784 StreamingOutputCallResponse>> 785 stream(serviceStub_.Get()->FullDuplexCall(&context)); 786 787 StreamingOutputCallRequest request; 788 request.mutable_payload()->set_body(grpc::string(27182, '\0')); 789 stream->Write(request); 790 791 Status s = stream->Finish(); 792 if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED, 793 context.debug_error_string())) { 794 return false; 795 } 796 797 gpr_log(GPR_DEBUG, "Pingpong streaming timeout done."); 798 return true; 799 } 800 801 bool InteropClient::DoEmptyStream() { 802 gpr_log(GPR_DEBUG, "Starting empty_stream."); 803 804 ClientContext context; 805 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, 806 StreamingOutputCallResponse>> 807 stream(serviceStub_.Get()->FullDuplexCall(&context)); 808 stream->WritesDone(); 809 StreamingOutputCallResponse response; 810 GPR_ASSERT(stream->Read(&response) == false); 811 812 Status s = stream->Finish(); 813 if (!AssertStatusOk(s, context.debug_error_string())) { 814 return false; 815 } 816 817 gpr_log(GPR_DEBUG, "empty_stream done."); 818 return true; 819 } 820 821 bool InteropClient::DoStatusWithMessage() { 822 gpr_log(GPR_DEBUG, 823 "Sending RPC with a request for status code 2 and message"); 824 825 const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN; 826 const grpc::string test_msg = "This is a test message"; 827 828 // Test UnaryCall. 829 ClientContext context; 830 SimpleRequest request; 831 SimpleResponse response; 832 EchoStatus* requested_status = request.mutable_response_status(); 833 requested_status->set_code(test_code); 834 requested_status->set_message(test_msg); 835 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); 836 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN, 837 context.debug_error_string())) { 838 return false; 839 } 840 GPR_ASSERT(s.error_message() == test_msg); 841 842 // Test FullDuplexCall. 843 ClientContext stream_context; 844 std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest, 845 StreamingOutputCallResponse>> 846 stream(serviceStub_.Get()->FullDuplexCall(&stream_context)); 847 StreamingOutputCallRequest streaming_request; 848 requested_status = streaming_request.mutable_response_status(); 849 requested_status->set_code(test_code); 850 requested_status->set_message(test_msg); 851 stream->Write(streaming_request); 852 stream->WritesDone(); 853 StreamingOutputCallResponse streaming_response; 854 while (stream->Read(&streaming_response)) 855 ; 856 s = stream->Finish(); 857 if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN, 858 context.debug_error_string())) { 859 return false; 860 } 861 GPR_ASSERT(s.error_message() == test_msg); 862 863 gpr_log(GPR_DEBUG, "Done testing Status and Message"); 864 return true; 865 } 866 867 bool InteropClient::DoCacheableUnary() { 868 gpr_log(GPR_DEBUG, "Sending RPC with cacheable response"); 869 870 // Create request with current timestamp 871 gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE); 872 std::string timestamp = 873 std::to_string(static_cast<long long unsigned>(ts.tv_nsec)); 874 SimpleRequest request; 875 request.mutable_payload()->set_body(timestamp.c_str(), timestamp.size()); 876 877 // Request 1 878 ClientContext context1; 879 SimpleResponse response1; 880 context1.set_cacheable(true); 881 // Add fake user IP since some proxy's (GFE) won't cache requests from 882 // localhost. 883 context1.AddMetadata("x-user-ip", "1.2.3.4"); 884 Status s1 = 885 serviceStub_.Get()->CacheableUnaryCall(&context1, request, &response1); 886 if (!AssertStatusOk(s1, context1.debug_error_string())) { 887 return false; 888 } 889 gpr_log(GPR_DEBUG, "response 1 payload: %s", 890 response1.payload().body().c_str()); 891 892 // Request 2 893 ClientContext context2; 894 SimpleResponse response2; 895 context2.set_cacheable(true); 896 context2.AddMetadata("x-user-ip", "1.2.3.4"); 897 Status s2 = 898 serviceStub_.Get()->CacheableUnaryCall(&context2, request, &response2); 899 if (!AssertStatusOk(s2, context2.debug_error_string())) { 900 return false; 901 } 902 gpr_log(GPR_DEBUG, "response 2 payload: %s", 903 response2.payload().body().c_str()); 904 905 // Check that the body is same for both requests. It will be the same if the 906 // second response is a cached copy of the first response 907 GPR_ASSERT(response2.payload().body() == response1.payload().body()); 908 909 // Request 3 910 // Modify the request body so it will not get a cache hit 911 ts = gpr_now(GPR_CLOCK_PRECISE); 912 timestamp = std::to_string(static_cast<long long unsigned>(ts.tv_nsec)); 913 SimpleRequest request1; 914 request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size()); 915 ClientContext context3; 916 SimpleResponse response3; 917 context3.set_cacheable(true); 918 context3.AddMetadata("x-user-ip", "1.2.3.4"); 919 Status s3 = 920 serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3); 921 if (!AssertStatusOk(s3, context3.debug_error_string())) { 922 return false; 923 } 924 gpr_log(GPR_DEBUG, "response 3 payload: %s", 925 response3.payload().body().c_str()); 926 927 // Check that the response is different from the previous response. 928 GPR_ASSERT(response3.payload().body() != response1.payload().body()); 929 return true; 930 } 931 932 bool InteropClient::DoCustomMetadata() { 933 const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial"); 934 const grpc::string kInitialMetadataValue("test_initial_metadata_value"); 935 const grpc::string kEchoTrailingBinMetadataKey( 936 "x-grpc-test-echo-trailing-bin"); 937 const grpc::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b"); 938 ; 939 940 { 941 gpr_log(GPR_DEBUG, "Sending RPC with custom metadata"); 942 ClientContext context; 943 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue); 944 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue); 945 SimpleRequest request; 946 SimpleResponse response; 947 request.set_response_size(kLargeResponseSize); 948 grpc::string payload(kLargeRequestSize, '\0'); 949 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); 950 951 Status s = serviceStub_.Get()->UnaryCall(&context, request, &response); 952 if (!AssertStatusOk(s, context.debug_error_string())) { 953 return false; 954 } 955 956 const auto& server_initial_metadata = context.GetServerInitialMetadata(); 957 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); 958 GPR_ASSERT(iter != server_initial_metadata.end()); 959 GPR_ASSERT(iter->second == kInitialMetadataValue); 960 const auto& server_trailing_metadata = context.GetServerTrailingMetadata(); 961 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey); 962 GPR_ASSERT(iter != server_trailing_metadata.end()); 963 GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) == 964 kTrailingBinValue); 965 966 gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata"); 967 } 968 969 { 970 gpr_log(GPR_DEBUG, "Sending stream with custom metadata"); 971 ClientContext context; 972 context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue); 973 context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue); 974 std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest, 975 StreamingOutputCallResponse>> 976 stream(serviceStub_.Get()->FullDuplexCall(&context)); 977 978 StreamingOutputCallRequest request; 979 ResponseParameters* response_parameter = request.add_response_parameters(); 980 response_parameter->set_size(kLargeResponseSize); 981 grpc::string payload(kLargeRequestSize, '\0'); 982 request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); 983 StreamingOutputCallResponse response; 984 985 if (!stream->Write(request)) { 986 gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed"); 987 return TransientFailureOrAbort(); 988 } 989 990 stream->WritesDone(); 991 992 if (!stream->Read(&response)) { 993 gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed"); 994 return TransientFailureOrAbort(); 995 } 996 997 GPR_ASSERT(response.payload().body() == 998 grpc::string(kLargeResponseSize, '\0')); 999 1000 GPR_ASSERT(!stream->Read(&response)); 1001 1002 Status s = stream->Finish(); 1003 if (!AssertStatusOk(s, context.debug_error_string())) { 1004 return false; 1005 } 1006 1007 const auto& server_initial_metadata = context.GetServerInitialMetadata(); 1008 auto iter = server_initial_metadata.find(kEchoInitialMetadataKey); 1009 GPR_ASSERT(iter != server_initial_metadata.end()); 1010 GPR_ASSERT(iter->second == kInitialMetadataValue); 1011 const auto& server_trailing_metadata = context.GetServerTrailingMetadata(); 1012 iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey); 1013 GPR_ASSERT(iter != server_trailing_metadata.end()); 1014 GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) == 1015 kTrailingBinValue); 1016 1017 gpr_log(GPR_DEBUG, "Done testing stream with custom metadata"); 1018 } 1019 1020 return true; 1021 } 1022 1023 bool InteropClient::DoRpcSoakTest(int32_t soak_iterations) { 1024 gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations); 1025 GPR_ASSERT(soak_iterations > 0); 1026 SimpleRequest request; 1027 SimpleResponse response; 1028 for (int i = 0; i < soak_iterations; ++i) { 1029 if (!PerformLargeUnary(&request, &response)) { 1030 gpr_log(GPR_ERROR, "rpc_soak test failed on iteration %d", i); 1031 return false; 1032 } 1033 } 1034 gpr_log(GPR_DEBUG, "rpc_soak test done."); 1035 return true; 1036 } 1037 1038 bool InteropClient::DoChannelSoakTest(int32_t soak_iterations) { 1039 gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...", 1040 soak_iterations); 1041 GPR_ASSERT(soak_iterations > 0); 1042 SimpleRequest request; 1043 SimpleResponse response; 1044 for (int i = 0; i < soak_iterations; ++i) { 1045 serviceStub_.ResetChannel(); 1046 if (!PerformLargeUnary(&request, &response)) { 1047 gpr_log(GPR_ERROR, "channel_soak test failed on iteration %d", i); 1048 return false; 1049 } 1050 } 1051 gpr_log(GPR_DEBUG, "channel_soak test done."); 1052 return true; 1053 } 1054 1055 bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations, 1056 int32_t iteration_interval) { 1057 gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations); 1058 GPR_ASSERT(soak_iterations > 0); 1059 GPR_ASSERT(iteration_interval > 0); 1060 SimpleRequest request; 1061 SimpleResponse response; 1062 int num_failures = 0; 1063 for (int i = 0; i < soak_iterations; ++i) { 1064 gpr_log(GPR_DEBUG, "Sending RPC number %d...", i); 1065 if (!PerformLargeUnary(&request, &response)) { 1066 gpr_log(GPR_ERROR, "Iteration %d failed.", i); 1067 num_failures++; 1068 } 1069 gpr_sleep_until( 1070 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), 1071 gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN))); 1072 } 1073 if (num_failures == 0) { 1074 gpr_log(GPR_DEBUG, "long_lived_channel test done."); 1075 return true; 1076 } else { 1077 gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.", 1078 num_failures); 1079 return false; 1080 } 1081 } 1082 1083 bool InteropClient::DoUnimplementedService() { 1084 gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service..."); 1085 1086 Empty request; 1087 Empty response; 1088 ClientContext context; 1089 1090 UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub(); 1091 1092 Status s = stub->UnimplementedCall(&context, request, &response); 1093 1094 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED, 1095 context.debug_error_string())) { 1096 return false; 1097 } 1098 1099 gpr_log(GPR_DEBUG, "unimplemented service done."); 1100 return true; 1101 } 1102 1103 bool InteropClient::DoUnimplementedMethod() { 1104 gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc..."); 1105 1106 Empty request; 1107 Empty response; 1108 ClientContext context; 1109 1110 Status s = 1111 serviceStub_.Get()->UnimplementedCall(&context, request, &response); 1112 1113 if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED, 1114 context.debug_error_string())) { 1115 return false; 1116 } 1117 1118 gpr_log(GPR_DEBUG, "unimplemented rpc done."); 1119 return true; 1120 } 1121 1122 } // namespace testing 1123 } // namespace grpc 1124