Home | History | Annotate | Download | only in codegen
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_H
     20 #define GRPCPP_IMPL_CODEGEN_CALL_H
     21 
     22 #include <assert.h>
     23 #include <cstring>
     24 #include <functional>
     25 #include <map>
     26 #include <memory>
     27 
     28 #include <grpcpp/impl/codegen/byte_buffer.h>
     29 #include <grpcpp/impl/codegen/call_hook.h>
     30 #include <grpcpp/impl/codegen/client_context.h>
     31 #include <grpcpp/impl/codegen/completion_queue_tag.h>
     32 #include <grpcpp/impl/codegen/config.h>
     33 #include <grpcpp/impl/codegen/core_codegen_interface.h>
     34 #include <grpcpp/impl/codegen/serialization_traits.h>
     35 #include <grpcpp/impl/codegen/slice.h>
     36 #include <grpcpp/impl/codegen/status.h>
     37 #include <grpcpp/impl/codegen/string_ref.h>
     38 
     39 #include <grpc/impl/codegen/atm.h>
     40 #include <grpc/impl/codegen/compression_types.h>
     41 #include <grpc/impl/codegen/grpc_types.h>
     42 
     43 namespace grpc {
     44 
     45 class ByteBuffer;
     46 class CompletionQueue;
     47 extern CoreCodegenInterface* g_core_codegen_interface;
     48 
     49 namespace internal {
     50 class Call;
     51 class CallHook;
     52 
     53 // TODO(yangg) if the map is changed before we send, the pointers will be a
     54 // mess. Make sure it does not happen.
     55 inline grpc_metadata* FillMetadataArray(
     56     const std::multimap<grpc::string, grpc::string>& metadata,
     57     size_t* metadata_count, const grpc::string& optional_error_details) {
     58   *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
     59   if (*metadata_count == 0) {
     60     return nullptr;
     61   }
     62   grpc_metadata* metadata_array =
     63       (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
     64           (*metadata_count) * sizeof(grpc_metadata)));
     65   size_t i = 0;
     66   for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
     67     metadata_array[i].key = SliceReferencingString(iter->first);
     68     metadata_array[i].value = SliceReferencingString(iter->second);
     69   }
     70   if (!optional_error_details.empty()) {
     71     metadata_array[i].key =
     72         g_core_codegen_interface->grpc_slice_from_static_buffer(
     73             kBinaryErrorDetailsKey, sizeof(kBinaryErrorDetailsKey) - 1);
     74     metadata_array[i].value = SliceReferencingString(optional_error_details);
     75   }
     76   return metadata_array;
     77 }
     78 }  // namespace internal
     79 
     80 /// Per-message write options.
     81 class WriteOptions {
     82  public:
     83   WriteOptions() : flags_(0), last_message_(false) {}
     84   WriteOptions(const WriteOptions& other)
     85       : flags_(other.flags_), last_message_(other.last_message_) {}
     86 
     87   /// Clear all flags.
     88   inline void Clear() { flags_ = 0; }
     89 
     90   /// Returns raw flags bitset.
     91   inline uint32_t flags() const { return flags_; }
     92 
     93   /// Sets flag for the disabling of compression for the next message write.
     94   ///
     95   /// \sa GRPC_WRITE_NO_COMPRESS
     96   inline WriteOptions& set_no_compression() {
     97     SetBit(GRPC_WRITE_NO_COMPRESS);
     98     return *this;
     99   }
    100 
    101   /// Clears flag for the disabling of compression for the next message write.
    102   ///
    103   /// \sa GRPC_WRITE_NO_COMPRESS
    104   inline WriteOptions& clear_no_compression() {
    105     ClearBit(GRPC_WRITE_NO_COMPRESS);
    106     return *this;
    107   }
    108 
    109   /// Get value for the flag indicating whether compression for the next
    110   /// message write is forcefully disabled.
    111   ///
    112   /// \sa GRPC_WRITE_NO_COMPRESS
    113   inline bool get_no_compression() const {
    114     return GetBit(GRPC_WRITE_NO_COMPRESS);
    115   }
    116 
    117   /// Sets flag indicating that the write may be buffered and need not go out on
    118   /// the wire immediately.
    119   ///
    120   /// \sa GRPC_WRITE_BUFFER_HINT
    121   inline WriteOptions& set_buffer_hint() {
    122     SetBit(GRPC_WRITE_BUFFER_HINT);
    123     return *this;
    124   }
    125 
    126   /// Clears flag indicating that the write may be buffered and need not go out
    127   /// on the wire immediately.
    128   ///
    129   /// \sa GRPC_WRITE_BUFFER_HINT
    130   inline WriteOptions& clear_buffer_hint() {
    131     ClearBit(GRPC_WRITE_BUFFER_HINT);
    132     return *this;
    133   }
    134 
    135   /// Get value for the flag indicating that the write may be buffered and need
    136   /// not go out on the wire immediately.
    137   ///
    138   /// \sa GRPC_WRITE_BUFFER_HINT
    139   inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
    140 
    141   /// corked bit: aliases set_buffer_hint currently, with the intent that
    142   /// set_buffer_hint will be removed in the future
    143   inline WriteOptions& set_corked() {
    144     SetBit(GRPC_WRITE_BUFFER_HINT);
    145     return *this;
    146   }
    147 
    148   inline WriteOptions& clear_corked() {
    149     ClearBit(GRPC_WRITE_BUFFER_HINT);
    150     return *this;
    151   }
    152 
    153   inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
    154 
    155   /// last-message bit: indicates this is the last message in a stream
    156   /// client-side:  makes Write the equivalent of performing Write, WritesDone
    157   /// in a single step
    158   /// server-side:  hold the Write until the service handler returns (sync api)
    159   /// or until Finish is called (async api)
    160   inline WriteOptions& set_last_message() {
    161     last_message_ = true;
    162     return *this;
    163   }
    164 
    165   /// Clears flag indicating that this is the last message in a stream,
    166   /// disabling coalescing.
    167   inline WriteOptions& clear_last_message() {
    168     last_message_ = false;
    169     return *this;
    170   }
    171 
    172   /// Guarantee that all bytes have been written to the socket before completing
    173   /// this write (usually writes are completed when they pass flow control).
    174   inline WriteOptions& set_write_through() {
    175     SetBit(GRPC_WRITE_THROUGH);
    176     return *this;
    177   }
    178 
    179   inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
    180 
    181   /// Get value for the flag indicating that this is the last message, and
    182   /// should be coalesced with trailing metadata.
    183   ///
    184   /// \sa GRPC_WRITE_LAST_MESSAGE
    185   bool is_last_message() const { return last_message_; }
    186 
    187   WriteOptions& operator=(const WriteOptions& rhs) {
    188     flags_ = rhs.flags_;
    189     return *this;
    190   }
    191 
    192  private:
    193   void SetBit(const uint32_t mask) { flags_ |= mask; }
    194 
    195   void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
    196 
    197   bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
    198 
    199   uint32_t flags_;
    200   bool last_message_;
    201 };
    202 
    203 namespace internal {
    204 /// Default argument for CallOpSet. I is unused by the class, but can be
    205 /// used for generating multiple names for the same thing.
    206 template <int I>
    207 class CallNoOp {
    208  protected:
    209   void AddOp(grpc_op* ops, size_t* nops) {}
    210   void FinishOp(bool* status) {}
    211 };
    212 
    213 class CallOpSendInitialMetadata {
    214  public:
    215   CallOpSendInitialMetadata() : send_(false) {
    216     maybe_compression_level_.is_set = false;
    217   }
    218 
    219   void SendInitialMetadata(
    220       const std::multimap<grpc::string, grpc::string>& metadata,
    221       uint32_t flags) {
    222     maybe_compression_level_.is_set = false;
    223     send_ = true;
    224     flags_ = flags;
    225     initial_metadata_ =
    226         FillMetadataArray(metadata, &initial_metadata_count_, "");
    227   }
    228 
    229   void set_compression_level(grpc_compression_level level) {
    230     maybe_compression_level_.is_set = true;
    231     maybe_compression_level_.level = level;
    232   }
    233 
    234  protected:
    235   void AddOp(grpc_op* ops, size_t* nops) {
    236     if (!send_) return;
    237     grpc_op* op = &ops[(*nops)++];
    238     op->op = GRPC_OP_SEND_INITIAL_METADATA;
    239     op->flags = flags_;
    240     op->reserved = NULL;
    241     op->data.send_initial_metadata.count = initial_metadata_count_;
    242     op->data.send_initial_metadata.metadata = initial_metadata_;
    243     op->data.send_initial_metadata.maybe_compression_level.is_set =
    244         maybe_compression_level_.is_set;
    245     if (maybe_compression_level_.is_set) {
    246       op->data.send_initial_metadata.maybe_compression_level.level =
    247           maybe_compression_level_.level;
    248     }
    249   }
    250   void FinishOp(bool* status) {
    251     if (!send_) return;
    252     g_core_codegen_interface->gpr_free(initial_metadata_);
    253     send_ = false;
    254   }
    255 
    256   bool send_;
    257   uint32_t flags_;
    258   size_t initial_metadata_count_;
    259   grpc_metadata* initial_metadata_;
    260   struct {
    261     bool is_set;
    262     grpc_compression_level level;
    263   } maybe_compression_level_;
    264 };
    265 
    266 class CallOpSendMessage {
    267  public:
    268   CallOpSendMessage() : send_buf_() {}
    269 
    270   /// Send \a message using \a options for the write. The \a options are cleared
    271   /// after use.
    272   template <class M>
    273   Status SendMessage(const M& message,
    274                      WriteOptions options) GRPC_MUST_USE_RESULT;
    275 
    276   template <class M>
    277   Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
    278 
    279  protected:
    280   void AddOp(grpc_op* ops, size_t* nops) {
    281     if (!send_buf_.Valid()) return;
    282     grpc_op* op = &ops[(*nops)++];
    283     op->op = GRPC_OP_SEND_MESSAGE;
    284     op->flags = write_options_.flags();
    285     op->reserved = NULL;
    286     op->data.send_message.send_message = send_buf_.c_buffer();
    287     // Flags are per-message: clear them after use.
    288     write_options_.Clear();
    289   }
    290   void FinishOp(bool* status) { send_buf_.Clear(); }
    291 
    292  private:
    293   ByteBuffer send_buf_;
    294   WriteOptions write_options_;
    295 };
    296 
    297 template <class M>
    298 Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
    299   write_options_ = options;
    300   bool own_buf;
    301   // TODO(vjpai): Remove the void below when possible
    302   // The void in the template parameter below should not be needed
    303   // (since it should be implicit) but is needed due to an observed
    304   // difference in behavior between clang and gcc for certain internal users
    305   Status result = SerializationTraits<M, void>::Serialize(
    306       message, send_buf_.bbuf_ptr(), &own_buf);
    307   if (!own_buf) {
    308     send_buf_.Duplicate();
    309   }
    310   return result;
    311 }
    312 
    313 template <class M>
    314 Status CallOpSendMessage::SendMessage(const M& message) {
    315   return SendMessage(message, WriteOptions());
    316 }
    317 
    318 template <class R>
    319 class CallOpRecvMessage {
    320  public:
    321   CallOpRecvMessage()
    322       : got_message(false),
    323         message_(nullptr),
    324         allow_not_getting_message_(false) {}
    325 
    326   void RecvMessage(R* message) { message_ = message; }
    327 
    328   // Do not change status if no message is received.
    329   void AllowNoMessage() { allow_not_getting_message_ = true; }
    330 
    331   bool got_message;
    332 
    333  protected:
    334   void AddOp(grpc_op* ops, size_t* nops) {
    335     if (message_ == nullptr) return;
    336     grpc_op* op = &ops[(*nops)++];
    337     op->op = GRPC_OP_RECV_MESSAGE;
    338     op->flags = 0;
    339     op->reserved = NULL;
    340     op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
    341   }
    342 
    343   void FinishOp(bool* status) {
    344     if (message_ == nullptr) return;
    345     if (recv_buf_.Valid()) {
    346       if (*status) {
    347         got_message = *status =
    348             SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
    349                 .ok();
    350         recv_buf_.Release();
    351       } else {
    352         got_message = false;
    353         recv_buf_.Clear();
    354       }
    355     } else {
    356       got_message = false;
    357       if (!allow_not_getting_message_) {
    358         *status = false;
    359       }
    360     }
    361     message_ = nullptr;
    362   }
    363 
    364  private:
    365   R* message_;
    366   ByteBuffer recv_buf_;
    367   bool allow_not_getting_message_;
    368 };
    369 
    370 class DeserializeFunc {
    371  public:
    372   virtual Status Deserialize(ByteBuffer* buf) = 0;
    373   virtual ~DeserializeFunc() {}
    374 };
    375 
    376 template <class R>
    377 class DeserializeFuncType final : public DeserializeFunc {
    378  public:
    379   DeserializeFuncType(R* message) : message_(message) {}
    380   Status Deserialize(ByteBuffer* buf) override {
    381     return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
    382   }
    383 
    384   ~DeserializeFuncType() override {}
    385 
    386  private:
    387   R* message_;  // Not a managed pointer because management is external to this
    388 };
    389 
    390 class CallOpGenericRecvMessage {
    391  public:
    392   CallOpGenericRecvMessage()
    393       : got_message(false), allow_not_getting_message_(false) {}
    394 
    395   template <class R>
    396   void RecvMessage(R* message) {
    397     // Use an explicit base class pointer to avoid resolution error in the
    398     // following unique_ptr::reset for some old implementations.
    399     DeserializeFunc* func = new DeserializeFuncType<R>(message);
    400     deserialize_.reset(func);
    401   }
    402 
    403   // Do not change status if no message is received.
    404   void AllowNoMessage() { allow_not_getting_message_ = true; }
    405 
    406   bool got_message;
    407 
    408  protected:
    409   void AddOp(grpc_op* ops, size_t* nops) {
    410     if (!deserialize_) return;
    411     grpc_op* op = &ops[(*nops)++];
    412     op->op = GRPC_OP_RECV_MESSAGE;
    413     op->flags = 0;
    414     op->reserved = NULL;
    415     op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
    416   }
    417 
    418   void FinishOp(bool* status) {
    419     if (!deserialize_) return;
    420     if (recv_buf_.Valid()) {
    421       if (*status) {
    422         got_message = true;
    423         *status = deserialize_->Deserialize(&recv_buf_).ok();
    424         recv_buf_.Release();
    425       } else {
    426         got_message = false;
    427         recv_buf_.Clear();
    428       }
    429     } else {
    430       got_message = false;
    431       if (!allow_not_getting_message_) {
    432         *status = false;
    433       }
    434     }
    435     deserialize_.reset();
    436   }
    437 
    438  private:
    439   std::unique_ptr<DeserializeFunc> deserialize_;
    440   ByteBuffer recv_buf_;
    441   bool allow_not_getting_message_;
    442 };
    443 
    444 class CallOpClientSendClose {
    445  public:
    446   CallOpClientSendClose() : send_(false) {}
    447 
    448   void ClientSendClose() { send_ = true; }
    449 
    450  protected:
    451   void AddOp(grpc_op* ops, size_t* nops) {
    452     if (!send_) return;
    453     grpc_op* op = &ops[(*nops)++];
    454     op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
    455     op->flags = 0;
    456     op->reserved = NULL;
    457   }
    458   void FinishOp(bool* status) { send_ = false; }
    459 
    460  private:
    461   bool send_;
    462 };
    463 
    464 class CallOpServerSendStatus {
    465  public:
    466   CallOpServerSendStatus() : send_status_available_(false) {}
    467 
    468   void ServerSendStatus(
    469       const std::multimap<grpc::string, grpc::string>& trailing_metadata,
    470       const Status& status) {
    471     send_error_details_ = status.error_details();
    472     trailing_metadata_ = FillMetadataArray(
    473         trailing_metadata, &trailing_metadata_count_, send_error_details_);
    474     send_status_available_ = true;
    475     send_status_code_ = static_cast<grpc_status_code>(status.error_code());
    476     send_error_message_ = status.error_message();
    477   }
    478 
    479  protected:
    480   void AddOp(grpc_op* ops, size_t* nops) {
    481     if (!send_status_available_) return;
    482     grpc_op* op = &ops[(*nops)++];
    483     op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
    484     op->data.send_status_from_server.trailing_metadata_count =
    485         trailing_metadata_count_;
    486     op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
    487     op->data.send_status_from_server.status = send_status_code_;
    488     error_message_slice_ = SliceReferencingString(send_error_message_);
    489     op->data.send_status_from_server.status_details =
    490         send_error_message_.empty() ? nullptr : &error_message_slice_;
    491     op->flags = 0;
    492     op->reserved = NULL;
    493   }
    494 
    495   void FinishOp(bool* status) {
    496     if (!send_status_available_) return;
    497     g_core_codegen_interface->gpr_free(trailing_metadata_);
    498     send_status_available_ = false;
    499   }
    500 
    501  private:
    502   bool send_status_available_;
    503   grpc_status_code send_status_code_;
    504   grpc::string send_error_details_;
    505   grpc::string send_error_message_;
    506   size_t trailing_metadata_count_;
    507   grpc_metadata* trailing_metadata_;
    508   grpc_slice error_message_slice_;
    509 };
    510 
    511 class CallOpRecvInitialMetadata {
    512  public:
    513   CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
    514 
    515   void RecvInitialMetadata(ClientContext* context) {
    516     context->initial_metadata_received_ = true;
    517     metadata_map_ = &context->recv_initial_metadata_;
    518   }
    519 
    520  protected:
    521   void AddOp(grpc_op* ops, size_t* nops) {
    522     if (metadata_map_ == nullptr) return;
    523     grpc_op* op = &ops[(*nops)++];
    524     op->op = GRPC_OP_RECV_INITIAL_METADATA;
    525     op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
    526     op->flags = 0;
    527     op->reserved = NULL;
    528   }
    529 
    530   void FinishOp(bool* status) {
    531     if (metadata_map_ == nullptr) return;
    532     metadata_map_ = nullptr;
    533   }
    534 
    535  private:
    536   MetadataMap* metadata_map_;
    537 };
    538 
    539 class CallOpClientRecvStatus {
    540  public:
    541   CallOpClientRecvStatus()
    542       : recv_status_(nullptr), debug_error_string_(nullptr) {}
    543 
    544   void ClientRecvStatus(ClientContext* context, Status* status) {
    545     client_context_ = context;
    546     metadata_map_ = &client_context_->trailing_metadata_;
    547     recv_status_ = status;
    548     error_message_ = g_core_codegen_interface->grpc_empty_slice();
    549   }
    550 
    551  protected:
    552   void AddOp(grpc_op* ops, size_t* nops) {
    553     if (recv_status_ == nullptr) return;
    554     grpc_op* op = &ops[(*nops)++];
    555     op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
    556     op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
    557     op->data.recv_status_on_client.status = &status_code_;
    558     op->data.recv_status_on_client.status_details = &error_message_;
    559     op->data.recv_status_on_client.error_string = &debug_error_string_;
    560     op->flags = 0;
    561     op->reserved = NULL;
    562   }
    563 
    564   void FinishOp(bool* status) {
    565     if (recv_status_ == nullptr) return;
    566     grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
    567     *recv_status_ =
    568         Status(static_cast<StatusCode>(status_code_),
    569                GRPC_SLICE_IS_EMPTY(error_message_)
    570                    ? grpc::string()
    571                    : grpc::string(GRPC_SLICE_START_PTR(error_message_),
    572                                   GRPC_SLICE_END_PTR(error_message_)),
    573                binary_error_details);
    574     client_context_->set_debug_error_string(
    575         debug_error_string_ != nullptr ? debug_error_string_ : "");
    576     g_core_codegen_interface->grpc_slice_unref(error_message_);
    577     if (debug_error_string_ != nullptr) {
    578       g_core_codegen_interface->gpr_free((void*)debug_error_string_);
    579     }
    580     recv_status_ = nullptr;
    581   }
    582 
    583  private:
    584   ClientContext* client_context_;
    585   MetadataMap* metadata_map_;
    586   Status* recv_status_;
    587   const char* debug_error_string_;
    588   grpc_status_code status_code_;
    589   grpc_slice error_message_;
    590 };
    591 
    592 /// An abstract collection of call ops, used to generate the
    593 /// grpc_call_op structure to pass down to the lower layers,
    594 /// and as it is-a CompletionQueueTag, also massages the final
    595 /// completion into the correct form for consumption in the C++
    596 /// API.
    597 class CallOpSetInterface : public CompletionQueueTag {
    598  public:
    599   /// Fills in grpc_op, starting from ops[*nops] and moving
    600   /// upwards.
    601   virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
    602 
    603   /// Get the tag to be used at the core completion queue. Generally, the
    604   /// value of cq_tag will be "this". However, it can be overridden if we
    605   /// want core to process the tag differently (e.g., as a core callback)
    606   virtual void* cq_tag() = 0;
    607 };
    608 
    609 /// Primary implementation of CallOpSetInterface.
    610 /// Since we cannot use variadic templates, we declare slots up to
    611 /// the maximum count of ops we'll need in a set. We leverage the
    612 /// empty base class optimization to slim this class (especially
    613 /// when there are many unused slots used). To avoid duplicate base classes,
    614 /// the template parmeter for CallNoOp is varied by argument position.
    615 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
    616           class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
    617           class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
    618 class CallOpSet : public CallOpSetInterface,
    619                   public Op1,
    620                   public Op2,
    621                   public Op3,
    622                   public Op4,
    623                   public Op5,
    624                   public Op6 {
    625  public:
    626   CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {}
    627   void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
    628     this->Op1::AddOp(ops, nops);
    629     this->Op2::AddOp(ops, nops);
    630     this->Op3::AddOp(ops, nops);
    631     this->Op4::AddOp(ops, nops);
    632     this->Op5::AddOp(ops, nops);
    633     this->Op6::AddOp(ops, nops);
    634     g_core_codegen_interface->grpc_call_ref(call);
    635     call_ = call;
    636   }
    637 
    638   bool FinalizeResult(void** tag, bool* status) override {
    639     this->Op1::FinishOp(status);
    640     this->Op2::FinishOp(status);
    641     this->Op3::FinishOp(status);
    642     this->Op4::FinishOp(status);
    643     this->Op5::FinishOp(status);
    644     this->Op6::FinishOp(status);
    645     *tag = return_tag_;
    646 
    647     g_core_codegen_interface->grpc_call_unref(call_);
    648     return true;
    649   }
    650 
    651   void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
    652 
    653   void* cq_tag() override { return cq_tag_; }
    654 
    655   /// set_cq_tag is used to provide a different core CQ tag than "this".
    656   /// This is used for callback-based tags, where the core tag is the core
    657   /// callback function. It does not change the use or behavior of any other
    658   /// function (such as FinalizeResult)
    659   void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
    660 
    661  private:
    662   void* cq_tag_;
    663   void* return_tag_;
    664   grpc_call* call_;
    665 };
    666 
    667 /// Straightforward wrapping of the C call object
    668 class Call final {
    669  public:
    670   /** call is owned by the caller */
    671   Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
    672       : call_hook_(call_hook),
    673         cq_(cq),
    674         call_(call),
    675         max_receive_message_size_(-1) {}
    676 
    677   Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
    678        int max_receive_message_size)
    679       : call_hook_(call_hook),
    680         cq_(cq),
    681         call_(call),
    682         max_receive_message_size_(max_receive_message_size) {}
    683 
    684   void PerformOps(CallOpSetInterface* ops) {
    685     call_hook_->PerformOpsOnCall(ops, this);
    686   }
    687 
    688   grpc_call* call() const { return call_; }
    689   CompletionQueue* cq() const { return cq_; }
    690 
    691   int max_receive_message_size() const { return max_receive_message_size_; }
    692 
    693  private:
    694   CallHook* call_hook_;
    695   CompletionQueue* cq_;
    696   grpc_call* call_;
    697   int max_receive_message_size_;
    698 };
    699 }  // namespace internal
    700 }  // namespace grpc
    701 
    702 #endif  // GRPCPP_IMPL_CODEGEN_CALL_H
    703