Home | History | Annotate | Download | only in codegen
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
     20 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
     21 
     22 #include <grpcpp/impl/codegen/call.h>
     23 #include <grpcpp/impl/codegen/channel_interface.h>
     24 #include <grpcpp/impl/codegen/client_context.h>
     25 #include <grpcpp/impl/codegen/completion_queue.h>
     26 #include <grpcpp/impl/codegen/core_codegen_interface.h>
     27 #include <grpcpp/impl/codegen/server_context.h>
     28 #include <grpcpp/impl/codegen/service_type.h>
     29 #include <grpcpp/impl/codegen/status.h>
     30 
     31 namespace grpc {
     32 
     33 namespace internal {
     34 /// Common interface for all synchronous client side streaming.
     35 class ClientStreamingInterface {
     36  public:
     37   virtual ~ClientStreamingInterface() {}
     38 
     39   /// Block waiting until the stream finishes and a final status of the call is
     40   /// available.
     41   ///
     42   /// It is appropriate to call this method when both:
     43   ///   * the calling code (client-side) has no more message to send
     44   ///     (this can be declared implicitly by calling this method, or
     45   ///     explicitly through an earlier call to <i>WritesDone</i> method of the
     46   ///     class in use, e.g. \a ClientWriterInterface::WritesDone or
     47   ///     \a ClientReaderWriterInterface::WritesDone).
     48   ///   * there are no more messages to be received from the server (which can
     49   ///     be known implicitly, or explicitly from an earlier call to \a
     50   ///     ReaderInterface::Read that returned "false").
     51   ///
     52   /// This function will return either:
     53   /// - when all incoming messages have been read and the server has
     54   ///   returned status.
     55   /// - when the server has returned a non-OK status.
     56   /// - OR when the call failed for some reason and the library generated a
     57   ///   status.
     58   ///
     59   /// Return values:
     60   ///   - \a Status contains the status code, message and details for the call
     61   ///   - the \a ClientContext associated with this call is updated with
     62   ///     possible trailing metadata sent from the server.
     63   virtual Status Finish() = 0;
     64 };
     65 
     66 /// Common interface for all synchronous server side streaming.
     67 class ServerStreamingInterface {
     68  public:
     69   virtual ~ServerStreamingInterface() {}
     70 
     71   /// Block to send initial metadata to client.
     72   /// This call is optional, but if it is used, it cannot be used concurrently
     73   /// with or after the \a Finish method.
     74   ///
     75   /// The initial metadata that will be sent to the client will be
     76   /// taken from the \a ServerContext associated with the call.
     77   virtual void SendInitialMetadata() = 0;
     78 };
     79 
     80 /// An interface that yields a sequence of messages of type \a R.
     81 template <class R>
     82 class ReaderInterface {
     83  public:
     84   virtual ~ReaderInterface() {}
     85 
     86   /// Get an upper bound on the next message size available for reading on this
     87   /// stream.
     88   virtual bool NextMessageSize(uint32_t* sz) = 0;
     89 
     90   /// Block to read a message and parse to \a msg. Returns \a true on success.
     91   /// This is thread-safe with respect to \a Write or \WritesDone methods on
     92   /// the same stream. It should not be called concurrently with another \a
     93   /// Read on the same stream as the order of delivery will not be defined.
     94   ///
     95   /// \param[out] msg The read message.
     96   ///
     97   /// \return \a false when there will be no more incoming messages, either
     98   /// because the other side has called \a WritesDone() or the stream has failed
     99   /// (or been cancelled).
    100   virtual bool Read(R* msg) = 0;
    101 };
    102 
    103 /// An interface that can be fed a sequence of messages of type \a W.
    104 template <class W>
    105 class WriterInterface {
    106  public:
    107   virtual ~WriterInterface() {}
    108 
    109   /// Block to write \a msg to the stream with WriteOptions \a options.
    110   /// This is thread-safe with respect to \a ReaderInterface::Read
    111   ///
    112   /// \param msg The message to be written to the stream.
    113   /// \param options The WriteOptions affecting the write operation.
    114   ///
    115   /// \return \a true on success, \a false when the stream has been closed.
    116   virtual bool Write(const W& msg, WriteOptions options) = 0;
    117 
    118   /// Block to write \a msg to the stream with default write options.
    119   /// This is thread-safe with respect to \a ReaderInterface::Read
    120   ///
    121   /// \param msg The message to be written to the stream.
    122   ///
    123   /// \return \a true on success, \a false when the stream has been closed.
    124   inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
    125 
    126   /// Write \a msg and coalesce it with the writing of trailing metadata, using
    127   /// WriteOptions \a options.
    128   ///
    129   /// For client, WriteLast is equivalent of performing Write and WritesDone in
    130   /// a single step. \a msg and trailing metadata are coalesced and sent on wire
    131   /// by calling this function. For server, WriteLast buffers the \a msg.
    132   /// The writing of \a msg is held until the service handler returns,
    133   /// where \a msg and trailing metadata are coalesced and sent on wire.
    134   /// Note that WriteLast can only buffer \a msg up to the flow control window
    135   /// size. If \a msg size is larger than the window size, it will be sent on
    136   /// wire without buffering.
    137   ///
    138   /// \param[in] msg The message to be written to the stream.
    139   /// \param[in] options The WriteOptions to be used to write this message.
    140   void WriteLast(const W& msg, WriteOptions options) {
    141     Write(msg, options.set_last_message());
    142   }
    143 };
    144 
    145 }  // namespace internal
    146 
    147 /// Client-side interface for streaming reads of message of type \a R.
    148 template <class R>
    149 class ClientReaderInterface : public internal::ClientStreamingInterface,
    150                               public internal::ReaderInterface<R> {
    151  public:
    152   /// Block to wait for initial metadata from server. The received metadata
    153   /// can only be accessed after this call returns. Should only be called before
    154   /// the first read. Calling this method is optional, and if it is not called
    155   /// the metadata will be available in ClientContext after the first read.
    156   virtual void WaitForInitialMetadata() = 0;
    157 };
    158 
    159 namespace internal {
    160 template <class R>
    161 class ClientReaderFactory {
    162  public:
    163   template <class W>
    164   static ClientReader<R>* Create(ChannelInterface* channel,
    165                                  const ::grpc::internal::RpcMethod& method,
    166                                  ClientContext* context, const W& request) {
    167     return new ClientReader<R>(channel, method, context, request);
    168   }
    169 };
    170 }  // namespace internal
    171 
    172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
    173 /// where the stream of messages coming from the server has messages
    174 /// of type \a R.
    175 template <class R>
    176 class ClientReader final : public ClientReaderInterface<R> {
    177  public:
    178   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
    179   /// semantics.
    180   ///
    181   //  Side effect:
    182   ///   Once complete, the initial metadata read from
    183   ///   the server will be accessable through the \a ClientContext used to
    184   ///   construct this object.
    185   void WaitForInitialMetadata() override {
    186     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
    187 
    188     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
    189         ops;
    190     ops.RecvInitialMetadata(context_);
    191     call_.PerformOps(&ops);
    192     cq_.Pluck(&ops);  /// status ignored
    193   }
    194 
    195   bool NextMessageSize(uint32_t* sz) override {
    196     *sz = call_.max_receive_message_size();
    197     return true;
    198   }
    199 
    200   /// See the \a ReaderInterface.Read method for semantics.
    201   /// Side effect:
    202   ///   This also receives initial metadata from the server, if not
    203   ///   already received (if initial metadata is received, it can be then
    204   ///   accessed through the \a ClientContext associated with this call).
    205   bool Read(R* msg) override {
    206     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
    207                                 ::grpc::internal::CallOpRecvMessage<R>>
    208         ops;
    209     if (!context_->initial_metadata_received_) {
    210       ops.RecvInitialMetadata(context_);
    211     }
    212     ops.RecvMessage(msg);
    213     call_.PerformOps(&ops);
    214     return cq_.Pluck(&ops) && ops.got_message;
    215   }
    216 
    217   /// See the \a ClientStreamingInterface.Finish method for semantics.
    218   ///
    219   /// Side effect:
    220   ///   The \a ClientContext associated with this call is updated with
    221   ///   possible metadata received from the server.
    222   Status Finish() override {
    223     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops;
    224     Status status;
    225     ops.ClientRecvStatus(context_, &status);
    226     call_.PerformOps(&ops);
    227     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
    228     return status;
    229   }
    230 
    231  private:
    232   friend class internal::ClientReaderFactory<R>;
    233   ClientContext* context_;
    234   CompletionQueue cq_;
    235   ::grpc::internal::Call call_;
    236 
    237   /// Block to create a stream and write the initial metadata and \a request
    238   /// out. Note that \a context will be used to fill in custom initial
    239   /// metadata used to send to the server when starting the call.
    240   template <class W>
    241   ClientReader(::grpc::ChannelInterface* channel,
    242                const ::grpc::internal::RpcMethod& method,
    243                ClientContext* context, const W& request)
    244       : context_(context),
    245         cq_(grpc_completion_queue_attributes{
    246             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
    247             nullptr}),  // Pluckable cq
    248         call_(channel->CreateCall(method, context, &cq_)) {
    249     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
    250                                 ::grpc::internal::CallOpSendMessage,
    251                                 ::grpc::internal::CallOpClientSendClose>
    252         ops;
    253     ops.SendInitialMetadata(context->send_initial_metadata_,
    254                             context->initial_metadata_flags());
    255     // TODO(ctiller): don't assert
    256     GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
    257     ops.ClientSendClose();
    258     call_.PerformOps(&ops);
    259     cq_.Pluck(&ops);
    260   }
    261 };
    262 
    263 /// Client-side interface for streaming writes of message type \a W.
    264 template <class W>
    265 class ClientWriterInterface : public internal::ClientStreamingInterface,
    266                               public internal::WriterInterface<W> {
    267  public:
    268   /// Half close writing from the client. (signal that the stream of messages
    269   /// coming from the client is complete).
    270   /// Blocks until currently-pending writes are completed.
    271   /// Thread safe with respect to \a ReaderInterface::Read operations only
    272   ///
    273   /// \return Whether the writes were successful.
    274   virtual bool WritesDone() = 0;
    275 };
    276 
    277 namespace internal {
    278 template <class W>
    279 class ClientWriterFactory {
    280  public:
    281   template <class R>
    282   static ClientWriter<W>* Create(::grpc::ChannelInterface* channel,
    283                                  const ::grpc::internal::RpcMethod& method,
    284                                  ClientContext* context, R* response) {
    285     return new ClientWriter<W>(channel, method, context, response);
    286   }
    287 };
    288 }  // namespace internal
    289 
    290 /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
    291 /// where the outgoing message stream coming from the client has messages of
    292 /// type \a W.
    293 template <class W>
    294 class ClientWriter : public ClientWriterInterface<W> {
    295  public:
    296   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
    297   /// semantics.
    298   ///
    299   //  Side effect:
    300   ///   Once complete, the initial metadata read from the server will be
    301   ///   accessable through the \a ClientContext used to construct this object.
    302   void WaitForInitialMetadata() {
    303     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
    304 
    305     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
    306         ops;
    307     ops.RecvInitialMetadata(context_);
    308     call_.PerformOps(&ops);
    309     cq_.Pluck(&ops);  // status ignored
    310   }
    311 
    312   /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
    313   /// for semantics.
    314   ///
    315   /// Side effect:
    316   ///   Also sends initial metadata if not already sent (using the
    317   ///   \a ClientContext associated with this call).
    318   using ::grpc::internal::WriterInterface<W>::Write;
    319   bool Write(const W& msg, WriteOptions options) override {
    320     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
    321                                 ::grpc::internal::CallOpSendMessage,
    322                                 ::grpc::internal::CallOpClientSendClose>
    323         ops;
    324 
    325     if (options.is_last_message()) {
    326       options.set_buffer_hint();
    327       ops.ClientSendClose();
    328     }
    329     if (context_->initial_metadata_corked_) {
    330       ops.SendInitialMetadata(context_->send_initial_metadata_,
    331                               context_->initial_metadata_flags());
    332       context_->set_initial_metadata_corked(false);
    333     }
    334     if (!ops.SendMessage(msg, options).ok()) {
    335       return false;
    336     }
    337 
    338     call_.PerformOps(&ops);
    339     return cq_.Pluck(&ops);
    340   }
    341 
    342   bool WritesDone() override {
    343     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
    344     ops.ClientSendClose();
    345     call_.PerformOps(&ops);
    346     return cq_.Pluck(&ops);
    347   }
    348 
    349   /// See the ClientStreamingInterface.Finish method for semantics.
    350   /// Side effects:
    351   ///   - Also receives initial metadata if not already received.
    352   ///   - Attempts to fill in the \a response parameter passed
    353   ///     to the constructor of this instance with the response
    354   ///     message from the server.
    355   Status Finish() override {
    356     Status status;
    357     if (!context_->initial_metadata_received_) {
    358       finish_ops_.RecvInitialMetadata(context_);
    359     }
    360     finish_ops_.ClientRecvStatus(context_, &status);
    361     call_.PerformOps(&finish_ops_);
    362     GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
    363     return status;
    364   }
    365 
    366  private:
    367   friend class internal::ClientWriterFactory<W>;
    368 
    369   /// Block to create a stream (i.e. send request headers and other initial
    370   /// metadata to the server). Note that \a context will be used to fill
    371   /// in custom initial metadata. \a response will be filled in with the
    372   /// single expected response message from the server upon a successful
    373   /// call to the \a Finish method of this instance.
    374   template <class R>
    375   ClientWriter(ChannelInterface* channel,
    376                const ::grpc::internal::RpcMethod& method,
    377                ClientContext* context, R* response)
    378       : context_(context),
    379         cq_(grpc_completion_queue_attributes{
    380             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
    381             nullptr}),  // Pluckable cq
    382         call_(channel->CreateCall(method, context, &cq_)) {
    383     finish_ops_.RecvMessage(response);
    384     finish_ops_.AllowNoMessage();
    385 
    386     if (!context_->initial_metadata_corked_) {
    387       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
    388           ops;
    389       ops.SendInitialMetadata(context->send_initial_metadata_,
    390                               context->initial_metadata_flags());
    391       call_.PerformOps(&ops);
    392       cq_.Pluck(&ops);
    393     }
    394   }
    395 
    396   ClientContext* context_;
    397   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
    398                               ::grpc::internal::CallOpGenericRecvMessage,
    399                               ::grpc::internal::CallOpClientRecvStatus>
    400       finish_ops_;
    401   CompletionQueue cq_;
    402   ::grpc::internal::Call call_;
    403 };
    404 
    405 /// Client-side interface for bi-directional streaming with
    406 /// client-to-server stream messages of type \a W and
    407 /// server-to-client stream messages of type \a R.
    408 template <class W, class R>
    409 class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
    410                                     public internal::WriterInterface<W>,
    411                                     public internal::ReaderInterface<R> {
    412  public:
    413   /// Block to wait for initial metadata from server. The received metadata
    414   /// can only be accessed after this call returns. Should only be called before
    415   /// the first read. Calling this method is optional, and if it is not called
    416   /// the metadata will be available in ClientContext after the first read.
    417   virtual void WaitForInitialMetadata() = 0;
    418 
    419   /// Half close writing from the client. (signal that the stream of messages
    420   /// coming from the clinet is complete).
    421   /// Blocks until currently-pending writes are completed.
    422   /// Thread-safe with respect to \a ReaderInterface::Read
    423   ///
    424   /// \return Whether the writes were successful.
    425   virtual bool WritesDone() = 0;
    426 };
    427 
    428 namespace internal {
    429 template <class W, class R>
    430 class ClientReaderWriterFactory {
    431  public:
    432   static ClientReaderWriter<W, R>* Create(
    433       ::grpc::ChannelInterface* channel,
    434       const ::grpc::internal::RpcMethod& method, ClientContext* context) {
    435     return new ClientReaderWriter<W, R>(channel, method, context);
    436   }
    437 };
    438 }  // namespace internal
    439 
    440 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
    441 /// where the outgoing message stream coming from the client has messages of
    442 /// type \a W, and the incoming messages stream coming from the server has
    443 /// messages of type \a R.
    444 template <class W, class R>
    445 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
    446  public:
    447   /// Block waiting to read initial metadata from the server.
    448   /// This call is optional, but if it is used, it cannot be used concurrently
    449   /// with or after the \a Finish method.
    450   ///
    451   /// Once complete, the initial metadata read from the server will be
    452   /// accessable through the \a ClientContext used to construct this object.
    453   void WaitForInitialMetadata() override {
    454     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
    455 
    456     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
    457         ops;
    458     ops.RecvInitialMetadata(context_);
    459     call_.PerformOps(&ops);
    460     cq_.Pluck(&ops);  // status ignored
    461   }
    462 
    463   bool NextMessageSize(uint32_t* sz) override {
    464     *sz = call_.max_receive_message_size();
    465     return true;
    466   }
    467 
    468   /// See the \a ReaderInterface.Read method for semantics.
    469   /// Side effect:
    470   ///   Also receives initial metadata if not already received (updates the \a
    471   ///   ClientContext associated with this call in that case).
    472   bool Read(R* msg) override {
    473     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
    474                                 ::grpc::internal::CallOpRecvMessage<R>>
    475         ops;
    476     if (!context_->initial_metadata_received_) {
    477       ops.RecvInitialMetadata(context_);
    478     }
    479     ops.RecvMessage(msg);
    480     call_.PerformOps(&ops);
    481     return cq_.Pluck(&ops) && ops.got_message;
    482   }
    483 
    484   /// See the \a WriterInterface.Write method for semantics.
    485   ///
    486   /// Side effect:
    487   ///   Also sends initial metadata if not already sent (using the
    488   ///   \a ClientContext associated with this call to fill in values).
    489   using ::grpc::internal::WriterInterface<W>::Write;
    490   bool Write(const W& msg, WriteOptions options) override {
    491     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
    492                                 ::grpc::internal::CallOpSendMessage,
    493                                 ::grpc::internal::CallOpClientSendClose>
    494         ops;
    495 
    496     if (options.is_last_message()) {
    497       options.set_buffer_hint();
    498       ops.ClientSendClose();
    499     }
    500     if (context_->initial_metadata_corked_) {
    501       ops.SendInitialMetadata(context_->send_initial_metadata_,
    502                               context_->initial_metadata_flags());
    503       context_->set_initial_metadata_corked(false);
    504     }
    505     if (!ops.SendMessage(msg, options).ok()) {
    506       return false;
    507     }
    508 
    509     call_.PerformOps(&ops);
    510     return cq_.Pluck(&ops);
    511   }
    512 
    513   bool WritesDone() override {
    514     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
    515     ops.ClientSendClose();
    516     call_.PerformOps(&ops);
    517     return cq_.Pluck(&ops);
    518   }
    519 
    520   /// See the ClientStreamingInterface.Finish method for semantics.
    521   ///
    522   /// Side effect:
    523   ///   - the \a ClientContext associated with this call is updated with
    524   ///     possible trailing metadata sent from the server.
    525   Status Finish() override {
    526     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
    527                                 ::grpc::internal::CallOpClientRecvStatus>
    528         ops;
    529     if (!context_->initial_metadata_received_) {
    530       ops.RecvInitialMetadata(context_);
    531     }
    532     Status status;
    533     ops.ClientRecvStatus(context_, &status);
    534     call_.PerformOps(&ops);
    535     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
    536     return status;
    537   }
    538 
    539  private:
    540   friend class internal::ClientReaderWriterFactory<W, R>;
    541 
    542   ClientContext* context_;
    543   CompletionQueue cq_;
    544   ::grpc::internal::Call call_;
    545 
    546   /// Block to create a stream and write the initial metadata and \a request
    547   /// out. Note that \a context will be used to fill in custom initial metadata
    548   /// used to send to the server when starting the call.
    549   ClientReaderWriter(::grpc::ChannelInterface* channel,
    550                      const ::grpc::internal::RpcMethod& method,
    551                      ClientContext* context)
    552       : context_(context),
    553         cq_(grpc_completion_queue_attributes{
    554             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
    555             nullptr}),  // Pluckable cq
    556         call_(channel->CreateCall(method, context, &cq_)) {
    557     if (!context_->initial_metadata_corked_) {
    558       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
    559           ops;
    560       ops.SendInitialMetadata(context->send_initial_metadata_,
    561                               context->initial_metadata_flags());
    562       call_.PerformOps(&ops);
    563       cq_.Pluck(&ops);
    564     }
    565   }
    566 };
    567 
    568 /// Server-side interface for streaming reads of message of type \a R.
    569 template <class R>
    570 class ServerReaderInterface : public internal::ServerStreamingInterface,
    571                               public internal::ReaderInterface<R> {};
    572 
    573 /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
    574 /// where the incoming message stream coming from the client has messages of
    575 /// type \a R.
    576 template <class R>
    577 class ServerReader final : public ServerReaderInterface<R> {
    578  public:
    579   /// See the \a ServerStreamingInterface.SendInitialMetadata method
    580   /// for semantics. Note that initial metadata will be affected by the
    581   /// \a ServerContext associated with this call.
    582   void SendInitialMetadata() override {
    583     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
    584 
    585     internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
    586     ops.SendInitialMetadata(ctx_->initial_metadata_,
    587                             ctx_->initial_metadata_flags());
    588     if (ctx_->compression_level_set()) {
    589       ops.set_compression_level(ctx_->compression_level());
    590     }
    591     ctx_->sent_initial_metadata_ = true;
    592     call_->PerformOps(&ops);
    593     call_->cq()->Pluck(&ops);
    594   }
    595 
    596   bool NextMessageSize(uint32_t* sz) override {
    597     *sz = call_->max_receive_message_size();
    598     return true;
    599   }
    600 
    601   bool Read(R* msg) override {
    602     internal::CallOpSet<internal::CallOpRecvMessage<R>> ops;
    603     ops.RecvMessage(msg);
    604     call_->PerformOps(&ops);
    605     return call_->cq()->Pluck(&ops) && ops.got_message;
    606   }
    607 
    608  private:
    609   internal::Call* const call_;
    610   ServerContext* const ctx_;
    611 
    612   template <class ServiceType, class RequestType, class ResponseType>
    613   friend class internal::ClientStreamingHandler;
    614 
    615   ServerReader(internal::Call* call, ServerContext* ctx)
    616       : call_(call), ctx_(ctx) {}
    617 };
    618 
    619 /// Server-side interface for streaming writes of message of type \a W.
    620 template <class W>
    621 class ServerWriterInterface : public internal::ServerStreamingInterface,
    622                               public internal::WriterInterface<W> {};
    623 
    624 /// Synchronous (blocking) server-side API for doing for doing a
    625 /// server-streaming RPCs, where the outgoing message stream coming from the
    626 /// server has messages of type \a W.
    627 template <class W>
    628 class ServerWriter final : public ServerWriterInterface<W> {
    629  public:
    630   /// See the \a ServerStreamingInterface.SendInitialMetadata method
    631   /// for semantics.
    632   /// Note that initial metadata will be affected by the
    633   /// \a ServerContext associated with this call.
    634   void SendInitialMetadata() override {
    635     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
    636 
    637     internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
    638     ops.SendInitialMetadata(ctx_->initial_metadata_,
    639                             ctx_->initial_metadata_flags());
    640     if (ctx_->compression_level_set()) {
    641       ops.set_compression_level(ctx_->compression_level());
    642     }
    643     ctx_->sent_initial_metadata_ = true;
    644     call_->PerformOps(&ops);
    645     call_->cq()->Pluck(&ops);
    646   }
    647 
    648   /// See the \a WriterInterface.Write method for semantics.
    649   ///
    650   /// Side effect:
    651   ///   Also sends initial metadata if not already sent (using the
    652   ///   \a ClientContext associated with this call to fill in values).
    653   using internal::WriterInterface<W>::Write;
    654   bool Write(const W& msg, WriteOptions options) override {
    655     if (options.is_last_message()) {
    656       options.set_buffer_hint();
    657     }
    658 
    659     if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
    660       return false;
    661     }
    662     if (!ctx_->sent_initial_metadata_) {
    663       ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_,
    664                                              ctx_->initial_metadata_flags());
    665       if (ctx_->compression_level_set()) {
    666         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
    667       }
    668       ctx_->sent_initial_metadata_ = true;
    669     }
    670     call_->PerformOps(&ctx_->pending_ops_);
    671     // if this is the last message we defer the pluck until AFTER we start
    672     // the trailing md op. This prevents hangs. See
    673     // https://github.com/grpc/grpc/issues/11546
    674     if (options.is_last_message()) {
    675       ctx_->has_pending_ops_ = true;
    676       return true;
    677     }
    678     ctx_->has_pending_ops_ = false;
    679     return call_->cq()->Pluck(&ctx_->pending_ops_);
    680   }
    681 
    682  private:
    683   internal::Call* const call_;
    684   ServerContext* const ctx_;
    685 
    686   template <class ServiceType, class RequestType, class ResponseType>
    687   friend class internal::ServerStreamingHandler;
    688 
    689   ServerWriter(internal::Call* call, ServerContext* ctx)
    690       : call_(call), ctx_(ctx) {}
    691 };
    692 
    693 /// Server-side interface for bi-directional streaming.
    694 template <class W, class R>
    695 class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
    696                                     public internal::WriterInterface<W>,
    697                                     public internal::ReaderInterface<R> {};
    698 
    699 /// Actual implementation of bi-directional streaming
    700 namespace internal {
    701 template <class W, class R>
    702 class ServerReaderWriterBody final {
    703  public:
    704   ServerReaderWriterBody(Call* call, ServerContext* ctx)
    705       : call_(call), ctx_(ctx) {}
    706 
    707   void SendInitialMetadata() {
    708     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
    709 
    710     CallOpSet<CallOpSendInitialMetadata> ops;
    711     ops.SendInitialMetadata(ctx_->initial_metadata_,
    712                             ctx_->initial_metadata_flags());
    713     if (ctx_->compression_level_set()) {
    714       ops.set_compression_level(ctx_->compression_level());
    715     }
    716     ctx_->sent_initial_metadata_ = true;
    717     call_->PerformOps(&ops);
    718     call_->cq()->Pluck(&ops);
    719   }
    720 
    721   bool NextMessageSize(uint32_t* sz) {
    722     *sz = call_->max_receive_message_size();
    723     return true;
    724   }
    725 
    726   bool Read(R* msg) {
    727     CallOpSet<CallOpRecvMessage<R>> ops;
    728     ops.RecvMessage(msg);
    729     call_->PerformOps(&ops);
    730     return call_->cq()->Pluck(&ops) && ops.got_message;
    731   }
    732 
    733   bool Write(const W& msg, WriteOptions options) {
    734     if (options.is_last_message()) {
    735       options.set_buffer_hint();
    736     }
    737     if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
    738       return false;
    739     }
    740     if (!ctx_->sent_initial_metadata_) {
    741       ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_,
    742                                              ctx_->initial_metadata_flags());
    743       if (ctx_->compression_level_set()) {
    744         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
    745       }
    746       ctx_->sent_initial_metadata_ = true;
    747     }
    748     call_->PerformOps(&ctx_->pending_ops_);
    749     // if this is the last message we defer the pluck until AFTER we start
    750     // the trailing md op. This prevents hangs. See
    751     // https://github.com/grpc/grpc/issues/11546
    752     if (options.is_last_message()) {
    753       ctx_->has_pending_ops_ = true;
    754       return true;
    755     }
    756     ctx_->has_pending_ops_ = false;
    757     return call_->cq()->Pluck(&ctx_->pending_ops_);
    758   }
    759 
    760  private:
    761   Call* const call_;
    762   ServerContext* const ctx_;
    763 };
    764 
    765 }  // namespace internal
    766 
    767 /// Synchronous (blocking) server-side API for a bidirectional
    768 /// streaming call, where the incoming message stream coming from the client has
    769 /// messages of type \a R, and the outgoing message streaming coming from
    770 /// the server has messages of type \a W.
    771 template <class W, class R>
    772 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
    773  public:
    774   /// See the \a ServerStreamingInterface.SendInitialMetadata method
    775   /// for semantics. Note that initial metadata will be affected by the
    776   /// \a ServerContext associated with this call.
    777   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
    778 
    779   bool NextMessageSize(uint32_t* sz) override {
    780     return body_.NextMessageSize(sz);
    781   }
    782 
    783   bool Read(R* msg) override { return body_.Read(msg); }
    784 
    785   /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
    786   /// method for semantics.
    787   /// Side effect:
    788   ///   Also sends initial metadata if not already sent (using the \a
    789   ///   ServerContext associated with this call).
    790   using internal::WriterInterface<W>::Write;
    791   bool Write(const W& msg, WriteOptions options) override {
    792     return body_.Write(msg, options);
    793   }
    794 
    795  private:
    796   internal::ServerReaderWriterBody<W, R> body_;
    797 
    798   friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
    799                                                        false>;
    800   ServerReaderWriter(internal::Call* call, ServerContext* ctx)
    801       : body_(call, ctx) {}
    802 };
    803 
    804 /// A class to represent a flow-controlled unary call. This is something
    805 /// of a hybrid between conventional unary and streaming. This is invoked
    806 /// through a unary call on the client side, but the server responds to it
    807 /// as though it were a single-ping-pong streaming call. The server can use
    808 /// the \a NextMessageSize method to determine an upper-bound on the size of
    809 /// the message. A key difference relative to streaming: ServerUnaryStreamer
    810 /// must have exactly 1 Read and exactly 1 Write, in that order, to function
    811 /// correctly. Otherwise, the RPC is in error.
    812 template <class RequestType, class ResponseType>
    813 class ServerUnaryStreamer final
    814     : public ServerReaderWriterInterface<ResponseType, RequestType> {
    815  public:
    816   /// Block to send initial metadata to client.
    817   /// Implicit input parameter:
    818   ///    - the \a ServerContext associated with this call will be used for
    819   ///      sending initial metadata.
    820   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
    821 
    822   /// Get an upper bound on the request message size from the client.
    823   bool NextMessageSize(uint32_t* sz) override {
    824     return body_.NextMessageSize(sz);
    825   }
    826 
    827   /// Read a message of type \a R into \a msg. Completion will be notified by \a
    828   /// tag on the associated completion queue.
    829   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
    830   /// should not be called concurrently with other streaming APIs
    831   /// on the same stream. It is not meaningful to call it concurrently
    832   /// with another \a ReaderInterface::Read on the same stream since reads on
    833   /// the same stream are delivered in order.
    834   ///
    835   /// \param[out] msg Where to eventually store the read message.
    836   /// \param[in] tag The tag identifying the operation.
    837   bool Read(RequestType* request) override {
    838     if (read_done_) {
    839       return false;
    840     }
    841     read_done_ = true;
    842     return body_.Read(request);
    843   }
    844 
    845   /// Block to write \a msg to the stream with WriteOptions \a options.
    846   /// This is thread-safe with respect to \a ReaderInterface::Read
    847   ///
    848   /// \param msg The message to be written to the stream.
    849   /// \param options The WriteOptions affecting the write operation.
    850   ///
    851   /// \return \a true on success, \a false when the stream has been closed.
    852   using internal::WriterInterface<ResponseType>::Write;
    853   bool Write(const ResponseType& response, WriteOptions options) override {
    854     if (write_done_ || !read_done_) {
    855       return false;
    856     }
    857     write_done_ = true;
    858     return body_.Write(response, options);
    859   }
    860 
    861  private:
    862   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
    863   bool read_done_;
    864   bool write_done_;
    865 
    866   friend class internal::TemplatedBidiStreamingHandler<
    867       ServerUnaryStreamer<RequestType, ResponseType>, true>;
    868   ServerUnaryStreamer(internal::Call* call, ServerContext* ctx)
    869       : body_(call, ctx), read_done_(false), write_done_(false) {}
    870 };
    871 
    872 /// A class to represent a flow-controlled server-side streaming call.
    873 /// This is something of a hybrid between server-side and bidi streaming.
    874 /// This is invoked through a server-side streaming call on the client side,
    875 /// but the server responds to it as though it were a bidi streaming call that
    876 /// must first have exactly 1 Read and then any number of Writes.
    877 template <class RequestType, class ResponseType>
    878 class ServerSplitStreamer final
    879     : public ServerReaderWriterInterface<ResponseType, RequestType> {
    880  public:
    881   /// Block to send initial metadata to client.
    882   /// Implicit input parameter:
    883   ///    - the \a ServerContext associated with this call will be used for
    884   ///      sending initial metadata.
    885   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
    886 
    887   /// Get an upper bound on the request message size from the client.
    888   bool NextMessageSize(uint32_t* sz) override {
    889     return body_.NextMessageSize(sz);
    890   }
    891 
    892   /// Read a message of type \a R into \a msg. Completion will be notified by \a
    893   /// tag on the associated completion queue.
    894   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
    895   /// should not be called concurrently with other streaming APIs
    896   /// on the same stream. It is not meaningful to call it concurrently
    897   /// with another \a ReaderInterface::Read on the same stream since reads on
    898   /// the same stream are delivered in order.
    899   ///
    900   /// \param[out] msg Where to eventually store the read message.
    901   /// \param[in] tag The tag identifying the operation.
    902   bool Read(RequestType* request) override {
    903     if (read_done_) {
    904       return false;
    905     }
    906     read_done_ = true;
    907     return body_.Read(request);
    908   }
    909 
    910   /// Block to write \a msg to the stream with WriteOptions \a options.
    911   /// This is thread-safe with respect to \a ReaderInterface::Read
    912   ///
    913   /// \param msg The message to be written to the stream.
    914   /// \param options The WriteOptions affecting the write operation.
    915   ///
    916   /// \return \a true on success, \a false when the stream has been closed.
    917   using internal::WriterInterface<ResponseType>::Write;
    918   bool Write(const ResponseType& response, WriteOptions options) override {
    919     return read_done_ && body_.Write(response, options);
    920   }
    921 
    922  private:
    923   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
    924   bool read_done_;
    925 
    926   friend class internal::TemplatedBidiStreamingHandler<
    927       ServerSplitStreamer<RequestType, ResponseType>, false>;
    928   ServerSplitStreamer(internal::Call* call, ServerContext* ctx)
    929       : body_(call, ctx), read_done_(false) {}
    930 };
    931 
    932 }  // namespace grpc
    933 
    934 #endif  // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
    935