Home | History | Annotate | Download | only in codegen
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
     20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
     21 
     22 #include <assert.h>
     23 #include <grpcpp/impl/codegen/call.h>
     24 #include <grpcpp/impl/codegen/channel_interface.h>
     25 #include <grpcpp/impl/codegen/client_context.h>
     26 #include <grpcpp/impl/codegen/server_context.h>
     27 #include <grpcpp/impl/codegen/service_type.h>
     28 #include <grpcpp/impl/codegen/status.h>
     29 
     30 namespace grpc {
     31 
     32 class CompletionQueue;
     33 extern CoreCodegenInterface* g_core_codegen_interface;
     34 
     35 /// An interface relevant for async client side unary RPCs (which send
     36 /// one request message to a server and receive one response message).
     37 template <class R>
     38 class ClientAsyncResponseReaderInterface {
     39  public:
     40   virtual ~ClientAsyncResponseReaderInterface() {}
     41 
     42   /// Start the call that was set up by the constructor, but only if the
     43   /// constructor was invoked through the "Prepare" API which doesn't actually
     44   /// start the call
     45   virtual void StartCall() = 0;
     46 
     47   /// Request notification of the reading of initial metadata. Completion
     48   /// will be notified by \a tag on the associated completion queue.
     49   /// This call is optional, but if it is used, it cannot be used concurrently
     50   /// with or after the \a Finish method.
     51   ///
     52   /// \param[in] tag Tag identifying this request.
     53   virtual void ReadInitialMetadata(void* tag) = 0;
     54 
     55   /// Request to receive the server's response \a msg and final \a status for
     56   /// the call, and to notify \a tag on this call's completion queue when
     57   /// finished.
     58   ///
     59   /// This function will return when either:
     60   /// - when the server's response message and status have been received.
     61   /// - when the server has returned a non-OK status (no message expected in
     62   ///   this case).
     63   /// - when the call failed for some reason and the library generated a
     64   ///   non-OK status.
     65   ///
     66   /// \param[in] tag Tag identifying this request.
     67   /// \param[out] status To be updated with the operation status.
     68   /// \param[out] msg To be filled in with the server's response message.
     69   virtual void Finish(R* msg, Status* status, void* tag) = 0;
     70 };
     71 
     72 namespace internal {
     73 template <class R>
     74 class ClientAsyncResponseReaderFactory {
     75  public:
     76   /// Start a call and write the request out if \a start is set.
     77   /// \a tag will be notified on \a cq when the call has been started (i.e.
     78   /// intitial metadata sent) and \a request has been written out.
     79   /// If \a start is not set, the actual call must be initiated by StartCall
     80   /// Note that \a context will be used to fill in custom initial metadata
     81   /// used to send to the server when starting the call.
     82   template <class W>
     83   static ClientAsyncResponseReader<R>* Create(
     84       ChannelInterface* channel, CompletionQueue* cq,
     85       const ::grpc::internal::RpcMethod& method, ClientContext* context,
     86       const W& request, bool start) {
     87     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
     88     return new (g_core_codegen_interface->grpc_call_arena_alloc(
     89         call.call(), sizeof(ClientAsyncResponseReader<R>)))
     90         ClientAsyncResponseReader<R>(call, context, request, start);
     91   }
     92 };
     93 }  // namespace internal
     94 
     95 /// Async API for client-side unary RPCs, where the message response
     96 /// received from the server is of type \a R.
     97 template <class R>
     98 class ClientAsyncResponseReader final
     99     : public ClientAsyncResponseReaderInterface<R> {
    100  public:
    101   // always allocated against a call arena, no memory free required
    102   static void operator delete(void* ptr, std::size_t size) {
    103     assert(size == sizeof(ClientAsyncResponseReader));
    104   }
    105 
    106   // This operator should never be called as the memory should be freed as part
    107   // of the arena destruction. It only exists to provide a matching operator
    108   // delete to the operator new so that some compilers will not complain (see
    109   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
    110   // there are no tests catching the compiler warning.
    111   static void operator delete(void*, void*) { assert(0); }
    112 
    113   void StartCall() override {
    114     assert(!started_);
    115     started_ = true;
    116     StartCallInternal();
    117   }
    118 
    119   /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
    120   /// semantics.
    121   ///
    122   /// Side effect:
    123   ///   - the \a ClientContext associated with this call is updated with
    124   ///     possible initial and trailing metadata sent from the server.
    125   void ReadInitialMetadata(void* tag) override {
    126     assert(started_);
    127     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
    128 
    129     single_buf.set_output_tag(tag);
    130     single_buf.RecvInitialMetadata(context_);
    131     call_.PerformOps(&single_buf);
    132     initial_metadata_read_ = true;
    133   }
    134 
    135   /// See \a ClientAysncResponseReaderInterface::Finish for semantics.
    136   ///
    137   /// Side effect:
    138   ///   - the \a ClientContext associated with this call is updated with
    139   ///     possible initial and trailing metadata sent from the server.
    140   void Finish(R* msg, Status* status, void* tag) override {
    141     assert(started_);
    142     if (initial_metadata_read_) {
    143       finish_buf.set_output_tag(tag);
    144       finish_buf.RecvMessage(msg);
    145       finish_buf.AllowNoMessage();
    146       finish_buf.ClientRecvStatus(context_, status);
    147       call_.PerformOps(&finish_buf);
    148     } else {
    149       single_buf.set_output_tag(tag);
    150       single_buf.RecvInitialMetadata(context_);
    151       single_buf.RecvMessage(msg);
    152       single_buf.AllowNoMessage();
    153       single_buf.ClientRecvStatus(context_, status);
    154       call_.PerformOps(&single_buf);
    155     }
    156   }
    157 
    158  private:
    159   friend class internal::ClientAsyncResponseReaderFactory<R>;
    160   ClientContext* const context_;
    161   ::grpc::internal::Call call_;
    162   bool started_;
    163   bool initial_metadata_read_ = false;
    164 
    165   template <class W>
    166   ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context,
    167                             const W& request, bool start)
    168       : context_(context), call_(call), started_(start) {
    169     // Bind the metadata at time of StartCallInternal but set up the rest here
    170     // TODO(ctiller): don't assert
    171     GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok());
    172     single_buf.ClientSendClose();
    173     if (start) StartCallInternal();
    174   }
    175 
    176   void StartCallInternal() {
    177     single_buf.SendInitialMetadata(context_->send_initial_metadata_,
    178                                    context_->initial_metadata_flags());
    179   }
    180 
    181   // disable operator new
    182   static void* operator new(std::size_t size);
    183   static void* operator new(std::size_t size, void* p) { return p; }
    184 
    185   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
    186                               ::grpc::internal::CallOpSendMessage,
    187                               ::grpc::internal::CallOpClientSendClose,
    188                               ::grpc::internal::CallOpRecvInitialMetadata,
    189                               ::grpc::internal::CallOpRecvMessage<R>,
    190                               ::grpc::internal::CallOpClientRecvStatus>
    191       single_buf;
    192   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>,
    193                               ::grpc::internal::CallOpClientRecvStatus>
    194       finish_buf;
    195 };
    196 
    197 /// Async server-side API for handling unary calls, where the single
    198 /// response message sent to the client is of type \a W.
    199 template <class W>
    200 class ServerAsyncResponseWriter final
    201     : public internal::ServerAsyncStreamingInterface {
    202  public:
    203   explicit ServerAsyncResponseWriter(ServerContext* ctx)
    204       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
    205 
    206   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
    207   ///
    208   /// Side effect:
    209   ///   The initial metadata that will be sent to the client from this op will
    210   ///   be taken from the \a ServerContext associated with the call.
    211   ///
    212   /// \param[in] tag Tag identifying this request.
    213   void SendInitialMetadata(void* tag) override {
    214     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
    215 
    216     meta_buf_.set_output_tag(tag);
    217     meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
    218                                   ctx_->initial_metadata_flags());
    219     if (ctx_->compression_level_set()) {
    220       meta_buf_.set_compression_level(ctx_->compression_level());
    221     }
    222     ctx_->sent_initial_metadata_ = true;
    223     call_.PerformOps(&meta_buf_);
    224   }
    225 
    226   /// Indicate that the stream is to be finished and request notification
    227   /// when the server has sent the appropriate signals to the client to
    228   /// end the call. Should not be used concurrently with other operations.
    229   ///
    230   /// \param[in] tag Tag identifying this request.
    231   /// \param[in] status To be sent to the client as the result of the call.
    232   /// \param[in] msg Message to be sent to the client.
    233   ///
    234   /// Side effect:
    235   ///   - also sends initial metadata if not already sent (using the
    236   ///     \a ServerContext associated with this call).
    237   ///
    238   /// Note: if \a status has a non-OK code, then \a msg will not be sent,
    239   /// and the client will receive only the status with possible trailing
    240   /// metadata.
    241   void Finish(const W& msg, const Status& status, void* tag) {
    242     finish_buf_.set_output_tag(tag);
    243     if (!ctx_->sent_initial_metadata_) {
    244       finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
    245                                       ctx_->initial_metadata_flags());
    246       if (ctx_->compression_level_set()) {
    247         finish_buf_.set_compression_level(ctx_->compression_level());
    248       }
    249       ctx_->sent_initial_metadata_ = true;
    250     }
    251     // The response is dropped if the status is not OK.
    252     if (status.ok()) {
    253       finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
    254                                    finish_buf_.SendMessage(msg));
    255     } else {
    256       finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
    257     }
    258     call_.PerformOps(&finish_buf_);
    259   }
    260 
    261   /// Indicate that the stream is to be finished with a non-OK status,
    262   /// and request notification for when the server has finished sending the
    263   /// appropriate signals to the client to end the call.
    264   /// Should not be used concurrently with other operations.
    265   ///
    266   /// \param[in] tag Tag identifying this request.
    267   /// \param[in] status To be sent to the client as the result of the call.
    268   ///   - Note: \a status must have a non-OK code.
    269   ///
    270   /// Side effect:
    271   ///   - also sends initial metadata if not already sent (using the
    272   ///     \a ServerContext associated with this call).
    273   void FinishWithError(const Status& status, void* tag) {
    274     GPR_CODEGEN_ASSERT(!status.ok());
    275     finish_buf_.set_output_tag(tag);
    276     if (!ctx_->sent_initial_metadata_) {
    277       finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
    278                                       ctx_->initial_metadata_flags());
    279       if (ctx_->compression_level_set()) {
    280         finish_buf_.set_compression_level(ctx_->compression_level());
    281       }
    282       ctx_->sent_initial_metadata_ = true;
    283     }
    284     finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
    285     call_.PerformOps(&finish_buf_);
    286   }
    287 
    288  private:
    289   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
    290 
    291   ::grpc::internal::Call call_;
    292   ServerContext* ctx_;
    293   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
    294       meta_buf_;
    295   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
    296                               ::grpc::internal::CallOpSendMessage,
    297                               ::grpc::internal::CallOpServerSendStatus>
    298       finish_buf_;
    299 };
    300 
    301 }  // namespace grpc
    302 
    303 namespace std {
    304 template <class R>
    305 class default_delete<grpc::ClientAsyncResponseReader<R>> {
    306  public:
    307   void operator()(void* p) {}
    308 };
    309 template <class R>
    310 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> {
    311  public:
    312   void operator()(void* p) {}
    313 };
    314 }  // namespace std
    315 
    316 #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
    317