Home | History | Annotate | Download | only in util
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include "test/cpp/util/cli_call.h"
     20 
     21 #include <iostream>
     22 #include <utility>
     23 
     24 #include <grpc/grpc.h>
     25 #include <grpc/slice.h>
     26 #include <grpc/support/log.h>
     27 #include <grpcpp/channel.h>
     28 #include <grpcpp/client_context.h>
     29 #include <grpcpp/support/byte_buffer.h>
     30 
     31 namespace grpc {
     32 namespace testing {
     33 namespace {
     34 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
     35 }  // namespace
     36 
     37 Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
     38                      const grpc::string& method, const grpc::string& request,
     39                      grpc::string* response,
     40                      const OutgoingMetadataContainer& metadata,
     41                      IncomingMetadataContainer* server_initial_metadata,
     42                      IncomingMetadataContainer* server_trailing_metadata) {
     43   CliCall call(std::move(channel), method, metadata);
     44   call.Write(request);
     45   call.WritesDone();
     46   if (!call.Read(response, server_initial_metadata)) {
     47     fprintf(stderr, "Failed to read response.\n");
     48   }
     49   return call.Finish(server_trailing_metadata);
     50 }
     51 
     52 CliCall::CliCall(const std::shared_ptr<grpc::Channel>& channel,
     53                  const grpc::string& method,
     54                  const OutgoingMetadataContainer& metadata)
     55     : stub_(new grpc::GenericStub(channel)) {
     56   gpr_mu_init(&write_mu_);
     57   gpr_cv_init(&write_cv_);
     58   if (!metadata.empty()) {
     59     for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
     60          iter != metadata.end(); ++iter) {
     61       ctx_.AddMetadata(iter->first, iter->second);
     62     }
     63   }
     64   call_ = stub_->PrepareCall(&ctx_, method, &cq_);
     65   call_->StartCall(tag(1));
     66   void* got_tag;
     67   bool ok;
     68   cq_.Next(&got_tag, &ok);
     69   GPR_ASSERT(ok);
     70 }
     71 
     72 CliCall::~CliCall() {
     73   gpr_cv_destroy(&write_cv_);
     74   gpr_mu_destroy(&write_mu_);
     75 }
     76 
     77 void CliCall::Write(const grpc::string& request) {
     78   void* got_tag;
     79   bool ok;
     80 
     81   gpr_slice s = gpr_slice_from_copied_buffer(request.data(), request.size());
     82   grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
     83   grpc::ByteBuffer send_buffer(&req_slice, 1);
     84   call_->Write(send_buffer, tag(2));
     85   cq_.Next(&got_tag, &ok);
     86   GPR_ASSERT(ok);
     87 }
     88 
     89 bool CliCall::Read(grpc::string* response,
     90                    IncomingMetadataContainer* server_initial_metadata) {
     91   void* got_tag;
     92   bool ok;
     93 
     94   grpc::ByteBuffer recv_buffer;
     95   call_->Read(&recv_buffer, tag(3));
     96 
     97   if (!cq_.Next(&got_tag, &ok) || !ok) {
     98     return false;
     99   }
    100   std::vector<grpc::Slice> slices;
    101   GPR_ASSERT(recv_buffer.Dump(&slices).ok());
    102 
    103   response->clear();
    104   for (size_t i = 0; i < slices.size(); i++) {
    105     response->append(reinterpret_cast<const char*>(slices[i].begin()),
    106                      slices[i].size());
    107   }
    108   if (server_initial_metadata) {
    109     *server_initial_metadata = ctx_.GetServerInitialMetadata();
    110   }
    111   return true;
    112 }
    113 
    114 void CliCall::WritesDone() {
    115   void* got_tag;
    116   bool ok;
    117 
    118   call_->WritesDone(tag(4));
    119   cq_.Next(&got_tag, &ok);
    120   GPR_ASSERT(ok);
    121 }
    122 
    123 void CliCall::WriteAndWait(const grpc::string& request) {
    124   grpc::Slice req_slice(request);
    125   grpc::ByteBuffer send_buffer(&req_slice, 1);
    126 
    127   gpr_mu_lock(&write_mu_);
    128   call_->Write(send_buffer, tag(2));
    129   write_done_ = false;
    130   while (!write_done_) {
    131     gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
    132   }
    133   gpr_mu_unlock(&write_mu_);
    134 }
    135 
    136 void CliCall::WritesDoneAndWait() {
    137   gpr_mu_lock(&write_mu_);
    138   call_->WritesDone(tag(4));
    139   write_done_ = false;
    140   while (!write_done_) {
    141     gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
    142   }
    143   gpr_mu_unlock(&write_mu_);
    144 }
    145 
    146 bool CliCall::ReadAndMaybeNotifyWrite(
    147     grpc::string* response,
    148     IncomingMetadataContainer* server_initial_metadata) {
    149   void* got_tag;
    150   bool ok;
    151   grpc::ByteBuffer recv_buffer;
    152 
    153   call_->Read(&recv_buffer, tag(3));
    154   bool cq_result = cq_.Next(&got_tag, &ok);
    155 
    156   while (got_tag != tag(3)) {
    157     gpr_mu_lock(&write_mu_);
    158     write_done_ = true;
    159     gpr_cv_signal(&write_cv_);
    160     gpr_mu_unlock(&write_mu_);
    161 
    162     cq_result = cq_.Next(&got_tag, &ok);
    163     if (got_tag == tag(2)) {
    164       GPR_ASSERT(ok);
    165     }
    166   }
    167 
    168   if (!cq_result || !ok) {
    169     // If the RPC is ended on the server side, we should still wait for the
    170     // pending write on the client side to be done.
    171     if (!ok) {
    172       gpr_mu_lock(&write_mu_);
    173       if (!write_done_) {
    174         cq_.Next(&got_tag, &ok);
    175         GPR_ASSERT(got_tag != tag(2));
    176         write_done_ = true;
    177         gpr_cv_signal(&write_cv_);
    178       }
    179       gpr_mu_unlock(&write_mu_);
    180     }
    181     return false;
    182   }
    183 
    184   std::vector<grpc::Slice> slices;
    185   GPR_ASSERT(recv_buffer.Dump(&slices).ok());
    186   response->clear();
    187   for (size_t i = 0; i < slices.size(); i++) {
    188     response->append(reinterpret_cast<const char*>(slices[i].begin()),
    189                      slices[i].size());
    190   }
    191   if (server_initial_metadata) {
    192     *server_initial_metadata = ctx_.GetServerInitialMetadata();
    193   }
    194   return true;
    195 }
    196 
    197 Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
    198   void* got_tag;
    199   bool ok;
    200   grpc::Status status;
    201 
    202   call_->Finish(&status, tag(5));
    203   cq_.Next(&got_tag, &ok);
    204   GPR_ASSERT(ok);
    205   if (server_trailing_metadata) {
    206     *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
    207   }
    208 
    209   return status;
    210 }
    211 
    212 }  // namespace testing
    213 }  // namespace grpc
    214