1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include "test/cpp/util/cli_call.h" 20 21 #include <iostream> 22 #include <utility> 23 24 #include <grpc/grpc.h> 25 #include <grpc/slice.h> 26 #include <grpc/support/log.h> 27 #include <grpcpp/channel.h> 28 #include <grpcpp/client_context.h> 29 #include <grpcpp/support/byte_buffer.h> 30 31 namespace grpc { 32 namespace testing { 33 namespace { 34 void* tag(int i) { return (void*)static_cast<intptr_t>(i); } 35 } // namespace 36 37 Status CliCall::Call(std::shared_ptr<grpc::Channel> channel, 38 const grpc::string& method, const grpc::string& request, 39 grpc::string* response, 40 const OutgoingMetadataContainer& metadata, 41 IncomingMetadataContainer* server_initial_metadata, 42 IncomingMetadataContainer* server_trailing_metadata) { 43 CliCall call(std::move(channel), method, metadata); 44 call.Write(request); 45 call.WritesDone(); 46 if (!call.Read(response, server_initial_metadata)) { 47 fprintf(stderr, "Failed to read response.\n"); 48 } 49 return call.Finish(server_trailing_metadata); 50 } 51 52 CliCall::CliCall(const std::shared_ptr<grpc::Channel>& channel, 53 const grpc::string& method, 54 const OutgoingMetadataContainer& metadata) 55 : stub_(new grpc::GenericStub(channel)) { 56 gpr_mu_init(&write_mu_); 57 gpr_cv_init(&write_cv_); 58 if (!metadata.empty()) { 59 for (OutgoingMetadataContainer::const_iterator iter = metadata.begin(); 60 iter != metadata.end(); ++iter) { 61 ctx_.AddMetadata(iter->first, iter->second); 62 } 63 } 64 call_ = stub_->PrepareCall(&ctx_, method, &cq_); 65 call_->StartCall(tag(1)); 66 void* got_tag; 67 bool ok; 68 cq_.Next(&got_tag, &ok); 69 GPR_ASSERT(ok); 70 } 71 72 CliCall::~CliCall() { 73 gpr_cv_destroy(&write_cv_); 74 gpr_mu_destroy(&write_mu_); 75 } 76 77 void CliCall::Write(const grpc::string& request) { 78 void* got_tag; 79 bool ok; 80 81 gpr_slice s = gpr_slice_from_copied_buffer(request.data(), request.size()); 82 grpc::Slice req_slice(s, grpc::Slice::STEAL_REF); 83 grpc::ByteBuffer send_buffer(&req_slice, 1); 84 call_->Write(send_buffer, tag(2)); 85 cq_.Next(&got_tag, &ok); 86 GPR_ASSERT(ok); 87 } 88 89 bool CliCall::Read(grpc::string* response, 90 IncomingMetadataContainer* server_initial_metadata) { 91 void* got_tag; 92 bool ok; 93 94 grpc::ByteBuffer recv_buffer; 95 call_->Read(&recv_buffer, tag(3)); 96 97 if (!cq_.Next(&got_tag, &ok) || !ok) { 98 return false; 99 } 100 std::vector<grpc::Slice> slices; 101 GPR_ASSERT(recv_buffer.Dump(&slices).ok()); 102 103 response->clear(); 104 for (size_t i = 0; i < slices.size(); i++) { 105 response->append(reinterpret_cast<const char*>(slices[i].begin()), 106 slices[i].size()); 107 } 108 if (server_initial_metadata) { 109 *server_initial_metadata = ctx_.GetServerInitialMetadata(); 110 } 111 return true; 112 } 113 114 void CliCall::WritesDone() { 115 void* got_tag; 116 bool ok; 117 118 call_->WritesDone(tag(4)); 119 cq_.Next(&got_tag, &ok); 120 GPR_ASSERT(ok); 121 } 122 123 void CliCall::WriteAndWait(const grpc::string& request) { 124 grpc::Slice req_slice(request); 125 grpc::ByteBuffer send_buffer(&req_slice, 1); 126 127 gpr_mu_lock(&write_mu_); 128 call_->Write(send_buffer, tag(2)); 129 write_done_ = false; 130 while (!write_done_) { 131 gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC)); 132 } 133 gpr_mu_unlock(&write_mu_); 134 } 135 136 void CliCall::WritesDoneAndWait() { 137 gpr_mu_lock(&write_mu_); 138 call_->WritesDone(tag(4)); 139 write_done_ = false; 140 while (!write_done_) { 141 gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC)); 142 } 143 gpr_mu_unlock(&write_mu_); 144 } 145 146 bool CliCall::ReadAndMaybeNotifyWrite( 147 grpc::string* response, 148 IncomingMetadataContainer* server_initial_metadata) { 149 void* got_tag; 150 bool ok; 151 grpc::ByteBuffer recv_buffer; 152 153 call_->Read(&recv_buffer, tag(3)); 154 bool cq_result = cq_.Next(&got_tag, &ok); 155 156 while (got_tag != tag(3)) { 157 gpr_mu_lock(&write_mu_); 158 write_done_ = true; 159 gpr_cv_signal(&write_cv_); 160 gpr_mu_unlock(&write_mu_); 161 162 cq_result = cq_.Next(&got_tag, &ok); 163 if (got_tag == tag(2)) { 164 GPR_ASSERT(ok); 165 } 166 } 167 168 if (!cq_result || !ok) { 169 // If the RPC is ended on the server side, we should still wait for the 170 // pending write on the client side to be done. 171 if (!ok) { 172 gpr_mu_lock(&write_mu_); 173 if (!write_done_) { 174 cq_.Next(&got_tag, &ok); 175 GPR_ASSERT(got_tag != tag(2)); 176 write_done_ = true; 177 gpr_cv_signal(&write_cv_); 178 } 179 gpr_mu_unlock(&write_mu_); 180 } 181 return false; 182 } 183 184 std::vector<grpc::Slice> slices; 185 GPR_ASSERT(recv_buffer.Dump(&slices).ok()); 186 response->clear(); 187 for (size_t i = 0; i < slices.size(); i++) { 188 response->append(reinterpret_cast<const char*>(slices[i].begin()), 189 slices[i].size()); 190 } 191 if (server_initial_metadata) { 192 *server_initial_metadata = ctx_.GetServerInitialMetadata(); 193 } 194 return true; 195 } 196 197 Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) { 198 void* got_tag; 199 bool ok; 200 grpc::Status status; 201 202 call_->Finish(&status, tag(5)); 203 cq_.Next(&got_tag, &ok); 204 GPR_ASSERT(ok); 205 if (server_trailing_metadata) { 206 *server_trailing_metadata = ctx_.GetServerTrailingMetadata(); 207 } 208 209 return status; 210 } 211 212 } // namespace testing 213 } // namespace grpc 214