1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 20 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 21 22 #include <grpcpp/impl/codegen/call.h> 23 #include <grpcpp/impl/codegen/channel_interface.h> 24 #include <grpcpp/impl/codegen/core_codegen_interface.h> 25 #include <grpcpp/impl/codegen/server_context.h> 26 #include <grpcpp/impl/codegen/service_type.h> 27 #include <grpcpp/impl/codegen/status.h> 28 29 namespace grpc { 30 31 class CompletionQueue; 32 33 namespace internal { 34 /// Common interface for all client side asynchronous streaming. 35 class ClientAsyncStreamingInterface { 36 public: 37 virtual ~ClientAsyncStreamingInterface() {} 38 39 /// Start the call that was set up by the constructor, but only if the 40 /// constructor was invoked through the "Prepare" API which doesn't actually 41 /// start the call 42 virtual void StartCall(void* tag) = 0; 43 44 /// Request notification of the reading of the initial metadata. Completion 45 /// will be notified by \a tag on the associated completion queue. 46 /// This call is optional, but if it is used, it cannot be used concurrently 47 /// with or after the \a AsyncReaderInterface::Read method. 48 /// 49 /// \param[in] tag Tag identifying this request. 50 virtual void ReadInitialMetadata(void* tag) = 0; 51 52 /// Indicate that the stream is to be finished and request notification for 53 /// when the call has been ended. 54 /// Should not be used concurrently with other operations. 55 /// 56 /// It is appropriate to call this method when both: 57 /// * the client side has no more message to send 58 /// (this can be declared implicitly by calling this method, or 59 /// explicitly through an earlier call to the <i>WritesDone</i> method 60 /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or 61 /// \a ClientAsyncReaderWriterInterface::WritesDone). 62 /// * there are no more messages to be received from the server (this can 63 /// be known implicitly by the calling code, or explicitly from an 64 /// earlier call to \a AsyncReaderInterface::Read that yielded a failed 65 /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 66 /// 67 /// The tag will be returned when either: 68 /// - all incoming messages have been read and the server has returned 69 /// a status. 70 /// - the server has returned a non-OK status. 71 /// - the call failed for some reason and the library generated a 72 /// status. 73 /// 74 /// Note that implementations of this method attempt to receive initial 75 /// metadata from the server if initial metadata hasn't yet been received. 76 /// 77 /// \param[in] tag Tag identifying this request. 78 /// \param[out] status To be updated with the operation status. 79 virtual void Finish(Status* status, void* tag) = 0; 80 }; 81 82 /// An interface that yields a sequence of messages of type \a R. 83 template <class R> 84 class AsyncReaderInterface { 85 public: 86 virtual ~AsyncReaderInterface() {} 87 88 /// Read a message of type \a R into \a msg. Completion will be notified by \a 89 /// tag on the associated completion queue. 90 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 91 /// should not be called concurrently with other streaming APIs 92 /// on the same stream. It is not meaningful to call it concurrently 93 /// with another \a AsyncReaderInterface::Read on the same stream since reads 94 /// on the same stream are delivered in order. 95 /// 96 /// \param[out] msg Where to eventually store the read message. 97 /// \param[in] tag The tag identifying the operation. 98 /// 99 /// Side effect: note that this method attempt to receive initial metadata for 100 /// a stream if it hasn't yet been received. 101 virtual void Read(R* msg, void* tag) = 0; 102 }; 103 104 /// An interface that can be fed a sequence of messages of type \a W. 105 template <class W> 106 class AsyncWriterInterface { 107 public: 108 virtual ~AsyncWriterInterface() {} 109 110 /// Request the writing of \a msg with identifying tag \a tag. 111 /// 112 /// Only one write may be outstanding at any given time. This means that 113 /// after calling Write, one must wait to receive \a tag from the completion 114 /// queue BEFORE calling Write again. 115 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 116 /// 117 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 118 /// to deallocate once Write returns. 119 /// 120 /// \param[in] msg The message to be written. 121 /// \param[in] tag The tag identifying the operation. 122 virtual void Write(const W& msg, void* tag) = 0; 123 124 /// Request the writing of \a msg using WriteOptions \a options with 125 /// identifying tag \a tag. 126 /// 127 /// Only one write may be outstanding at any given time. This means that 128 /// after calling Write, one must wait to receive \a tag from the completion 129 /// queue BEFORE calling Write again. 130 /// WriteOptions \a options is used to set the write options of this message. 131 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 132 /// 133 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 134 /// to deallocate once Write returns. 135 /// 136 /// \param[in] msg The message to be written. 137 /// \param[in] options The WriteOptions to be used to write this message. 138 /// \param[in] tag The tag identifying the operation. 139 virtual void Write(const W& msg, WriteOptions options, void* tag) = 0; 140 141 /// Request the writing of \a msg and coalesce it with the writing 142 /// of trailing metadata, using WriteOptions \a options with 143 /// identifying tag \a tag. 144 /// 145 /// For client, WriteLast is equivalent of performing Write and 146 /// WritesDone in a single step. 147 /// For server, WriteLast buffers the \a msg. The writing of \a msg is held 148 /// until Finish is called, where \a msg and trailing metadata are coalesced 149 /// and write is initiated. Note that WriteLast can only buffer \a msg up to 150 /// the flow control window size. If \a msg size is larger than the window 151 /// size, it will be sent on wire without buffering. 152 /// 153 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 154 /// to deallocate once Write returns. 155 /// 156 /// \param[in] msg The message to be written. 157 /// \param[in] options The WriteOptions to be used to write this message. 158 /// \param[in] tag The tag identifying the operation. 159 void WriteLast(const W& msg, WriteOptions options, void* tag) { 160 Write(msg, options.set_last_message(), tag); 161 } 162 }; 163 164 } // namespace internal 165 166 template <class R> 167 class ClientAsyncReaderInterface 168 : public internal::ClientAsyncStreamingInterface, 169 public internal::AsyncReaderInterface<R> {}; 170 171 namespace internal { 172 template <class R> 173 class ClientAsyncReaderFactory { 174 public: 175 /// Create a stream object. 176 /// Write the first request out if \a start is set. 177 /// \a tag will be notified on \a cq when the call has been started and 178 /// \a request has been written out. If \a start is not set, \a tag must be 179 /// nullptr and the actual call must be initiated by StartCall 180 /// Note that \a context will be used to fill in custom initial metadata 181 /// used to send to the server when starting the call. 182 template <class W> 183 static ClientAsyncReader<R>* Create(ChannelInterface* channel, 184 CompletionQueue* cq, 185 const ::grpc::internal::RpcMethod& method, 186 ClientContext* context, const W& request, 187 bool start, void* tag) { 188 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 189 return new (g_core_codegen_interface->grpc_call_arena_alloc( 190 call.call(), sizeof(ClientAsyncReader<R>))) 191 ClientAsyncReader<R>(call, context, request, start, tag); 192 } 193 }; 194 } // namespace internal 195 196 /// Async client-side API for doing server-streaming RPCs, 197 /// where the incoming message stream coming from the server has 198 /// messages of type \a R. 199 template <class R> 200 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { 201 public: 202 // always allocated against a call arena, no memory free required 203 static void operator delete(void* ptr, std::size_t size) { 204 assert(size == sizeof(ClientAsyncReader)); 205 } 206 207 // This operator should never be called as the memory should be freed as part 208 // of the arena destruction. It only exists to provide a matching operator 209 // delete to the operator new so that some compilers will not complain (see 210 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 211 // there are no tests catching the compiler warning. 212 static void operator delete(void*, void*) { assert(0); } 213 214 void StartCall(void* tag) override { 215 assert(!started_); 216 started_ = true; 217 StartCallInternal(tag); 218 } 219 220 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata 221 /// method for semantics. 222 /// 223 /// Side effect: 224 /// - upon receiving initial metadata from the server, 225 /// the \a ClientContext associated with this call is updated, and the 226 /// calling code can access the received metadata through the 227 /// \a ClientContext. 228 void ReadInitialMetadata(void* tag) override { 229 assert(started_); 230 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 231 232 meta_ops_.set_output_tag(tag); 233 meta_ops_.RecvInitialMetadata(context_); 234 call_.PerformOps(&meta_ops_); 235 } 236 237 void Read(R* msg, void* tag) override { 238 assert(started_); 239 read_ops_.set_output_tag(tag); 240 if (!context_->initial_metadata_received_) { 241 read_ops_.RecvInitialMetadata(context_); 242 } 243 read_ops_.RecvMessage(msg); 244 call_.PerformOps(&read_ops_); 245 } 246 247 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 248 /// 249 /// Side effect: 250 /// - the \a ClientContext associated with this call is updated with 251 /// possible initial and trailing metadata received from the server. 252 void Finish(Status* status, void* tag) override { 253 assert(started_); 254 finish_ops_.set_output_tag(tag); 255 if (!context_->initial_metadata_received_) { 256 finish_ops_.RecvInitialMetadata(context_); 257 } 258 finish_ops_.ClientRecvStatus(context_, status); 259 call_.PerformOps(&finish_ops_); 260 } 261 262 private: 263 friend class internal::ClientAsyncReaderFactory<R>; 264 template <class W> 265 ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, 266 const W& request, bool start, void* tag) 267 : context_(context), call_(call), started_(start) { 268 // TODO(ctiller): don't assert 269 GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); 270 init_ops_.ClientSendClose(); 271 if (start) { 272 StartCallInternal(tag); 273 } else { 274 assert(tag == nullptr); 275 } 276 } 277 278 void StartCallInternal(void* tag) { 279 init_ops_.SendInitialMetadata(context_->send_initial_metadata_, 280 context_->initial_metadata_flags()); 281 init_ops_.set_output_tag(tag); 282 call_.PerformOps(&init_ops_); 283 } 284 285 ClientContext* context_; 286 ::grpc::internal::Call call_; 287 bool started_; 288 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 289 ::grpc::internal::CallOpSendMessage, 290 ::grpc::internal::CallOpClientSendClose> 291 init_ops_; 292 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 293 meta_ops_; 294 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 295 ::grpc::internal::CallOpRecvMessage<R>> 296 read_ops_; 297 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 298 ::grpc::internal::CallOpClientRecvStatus> 299 finish_ops_; 300 }; 301 302 /// Common interface for client side asynchronous writing. 303 template <class W> 304 class ClientAsyncWriterInterface 305 : public internal::ClientAsyncStreamingInterface, 306 public internal::AsyncWriterInterface<W> { 307 public: 308 /// Signal the client is done with the writes (half-close the client stream). 309 /// Thread-safe with respect to \a AsyncReaderInterface::Read 310 /// 311 /// \param[in] tag The tag identifying the operation. 312 virtual void WritesDone(void* tag) = 0; 313 }; 314 315 namespace internal { 316 template <class W> 317 class ClientAsyncWriterFactory { 318 public: 319 /// Create a stream object. 320 /// Start the RPC if \a start is set 321 /// \a tag will be notified on \a cq when the call has been started (i.e. 322 /// intitial metadata sent) and \a request has been written out. 323 /// If \a start is not set, \a tag must be nullptr and the actual call 324 /// must be initiated by StartCall 325 /// Note that \a context will be used to fill in custom initial metadata 326 /// used to send to the server when starting the call. 327 /// \a response will be filled in with the single expected response 328 /// message from the server upon a successful call to the \a Finish 329 /// method of this instance. 330 template <class R> 331 static ClientAsyncWriter<W>* Create(ChannelInterface* channel, 332 CompletionQueue* cq, 333 const ::grpc::internal::RpcMethod& method, 334 ClientContext* context, R* response, 335 bool start, void* tag) { 336 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 337 return new (g_core_codegen_interface->grpc_call_arena_alloc( 338 call.call(), sizeof(ClientAsyncWriter<W>))) 339 ClientAsyncWriter<W>(call, context, response, start, tag); 340 } 341 }; 342 } // namespace internal 343 344 /// Async API on the client side for doing client-streaming RPCs, 345 /// where the outgoing message stream going to the server contains 346 /// messages of type \a W. 347 template <class W> 348 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { 349 public: 350 // always allocated against a call arena, no memory free required 351 static void operator delete(void* ptr, std::size_t size) { 352 assert(size == sizeof(ClientAsyncWriter)); 353 } 354 355 // This operator should never be called as the memory should be freed as part 356 // of the arena destruction. It only exists to provide a matching operator 357 // delete to the operator new so that some compilers will not complain (see 358 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 359 // there are no tests catching the compiler warning. 360 static void operator delete(void*, void*) { assert(0); } 361 362 void StartCall(void* tag) override { 363 assert(!started_); 364 started_ = true; 365 StartCallInternal(tag); 366 } 367 368 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for 369 /// semantics. 370 /// 371 /// Side effect: 372 /// - upon receiving initial metadata from the server, the \a ClientContext 373 /// associated with this call is updated, and the calling code can access 374 /// the received metadata through the \a ClientContext. 375 void ReadInitialMetadata(void* tag) override { 376 assert(started_); 377 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 378 379 meta_ops_.set_output_tag(tag); 380 meta_ops_.RecvInitialMetadata(context_); 381 call_.PerformOps(&meta_ops_); 382 } 383 384 void Write(const W& msg, void* tag) override { 385 assert(started_); 386 write_ops_.set_output_tag(tag); 387 // TODO(ctiller): don't assert 388 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 389 call_.PerformOps(&write_ops_); 390 } 391 392 void Write(const W& msg, WriteOptions options, void* tag) override { 393 assert(started_); 394 write_ops_.set_output_tag(tag); 395 if (options.is_last_message()) { 396 options.set_buffer_hint(); 397 write_ops_.ClientSendClose(); 398 } 399 // TODO(ctiller): don't assert 400 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 401 call_.PerformOps(&write_ops_); 402 } 403 404 void WritesDone(void* tag) override { 405 assert(started_); 406 write_ops_.set_output_tag(tag); 407 write_ops_.ClientSendClose(); 408 call_.PerformOps(&write_ops_); 409 } 410 411 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 412 /// 413 /// Side effect: 414 /// - the \a ClientContext associated with this call is updated with 415 /// possible initial and trailing metadata received from the server. 416 /// - attempts to fill in the \a response parameter passed to this class's 417 /// constructor with the server's response message. 418 void Finish(Status* status, void* tag) override { 419 assert(started_); 420 finish_ops_.set_output_tag(tag); 421 if (!context_->initial_metadata_received_) { 422 finish_ops_.RecvInitialMetadata(context_); 423 } 424 finish_ops_.ClientRecvStatus(context_, status); 425 call_.PerformOps(&finish_ops_); 426 } 427 428 private: 429 friend class internal::ClientAsyncWriterFactory<W>; 430 template <class R> 431 ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, 432 R* response, bool start, void* tag) 433 : context_(context), call_(call), started_(start) { 434 finish_ops_.RecvMessage(response); 435 finish_ops_.AllowNoMessage(); 436 if (start) { 437 StartCallInternal(tag); 438 } else { 439 assert(tag == nullptr); 440 } 441 } 442 443 void StartCallInternal(void* tag) { 444 write_ops_.SendInitialMetadata(context_->send_initial_metadata_, 445 context_->initial_metadata_flags()); 446 // if corked bit is set in context, we just keep the initial metadata 447 // buffered up to coalesce with later message send. No op is performed. 448 if (!context_->initial_metadata_corked_) { 449 write_ops_.set_output_tag(tag); 450 call_.PerformOps(&write_ops_); 451 } 452 } 453 454 ClientContext* context_; 455 ::grpc::internal::Call call_; 456 bool started_; 457 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 458 meta_ops_; 459 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 460 ::grpc::internal::CallOpSendMessage, 461 ::grpc::internal::CallOpClientSendClose> 462 write_ops_; 463 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 464 ::grpc::internal::CallOpGenericRecvMessage, 465 ::grpc::internal::CallOpClientRecvStatus> 466 finish_ops_; 467 }; 468 469 /// Async client-side interface for bi-directional streaming, 470 /// where the client-to-server message stream has messages of type \a W, 471 /// and the server-to-client message stream has messages of type \a R. 472 template <class W, class R> 473 class ClientAsyncReaderWriterInterface 474 : public internal::ClientAsyncStreamingInterface, 475 public internal::AsyncWriterInterface<W>, 476 public internal::AsyncReaderInterface<R> { 477 public: 478 /// Signal the client is done with the writes (half-close the client stream). 479 /// Thread-safe with respect to \a AsyncReaderInterface::Read 480 /// 481 /// \param[in] tag The tag identifying the operation. 482 virtual void WritesDone(void* tag) = 0; 483 }; 484 485 namespace internal { 486 template <class W, class R> 487 class ClientAsyncReaderWriterFactory { 488 public: 489 /// Create a stream object. 490 /// Start the RPC request if \a start is set. 491 /// \a tag will be notified on \a cq when the call has been started (i.e. 492 /// intitial metadata sent). If \a start is not set, \a tag must be 493 /// nullptr and the actual call must be initiated by StartCall 494 /// Note that \a context will be used to fill in custom initial metadata 495 /// used to send to the server when starting the call. 496 static ClientAsyncReaderWriter<W, R>* Create( 497 ChannelInterface* channel, CompletionQueue* cq, 498 const ::grpc::internal::RpcMethod& method, ClientContext* context, 499 bool start, void* tag) { 500 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 501 502 return new (g_core_codegen_interface->grpc_call_arena_alloc( 503 call.call(), sizeof(ClientAsyncReaderWriter<W, R>))) 504 ClientAsyncReaderWriter<W, R>(call, context, start, tag); 505 } 506 }; 507 } // namespace internal 508 509 /// Async client-side interface for bi-directional streaming, 510 /// where the outgoing message stream going to the server 511 /// has messages of type \a W, and the incoming message stream coming 512 /// from the server has messages of type \a R. 513 template <class W, class R> 514 class ClientAsyncReaderWriter final 515 : public ClientAsyncReaderWriterInterface<W, R> { 516 public: 517 // always allocated against a call arena, no memory free required 518 static void operator delete(void* ptr, std::size_t size) { 519 assert(size == sizeof(ClientAsyncReaderWriter)); 520 } 521 522 // This operator should never be called as the memory should be freed as part 523 // of the arena destruction. It only exists to provide a matching operator 524 // delete to the operator new so that some compilers will not complain (see 525 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 526 // there are no tests catching the compiler warning. 527 static void operator delete(void*, void*) { assert(0); } 528 529 void StartCall(void* tag) override { 530 assert(!started_); 531 started_ = true; 532 StartCallInternal(tag); 533 } 534 535 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method 536 /// for semantics of this method. 537 /// 538 /// Side effect: 539 /// - upon receiving initial metadata from the server, the \a ClientContext 540 /// is updated with it, and then the receiving initial metadata can 541 /// be accessed through this \a ClientContext. 542 void ReadInitialMetadata(void* tag) override { 543 assert(started_); 544 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 545 546 meta_ops_.set_output_tag(tag); 547 meta_ops_.RecvInitialMetadata(context_); 548 call_.PerformOps(&meta_ops_); 549 } 550 551 void Read(R* msg, void* tag) override { 552 assert(started_); 553 read_ops_.set_output_tag(tag); 554 if (!context_->initial_metadata_received_) { 555 read_ops_.RecvInitialMetadata(context_); 556 } 557 read_ops_.RecvMessage(msg); 558 call_.PerformOps(&read_ops_); 559 } 560 561 void Write(const W& msg, void* tag) override { 562 assert(started_); 563 write_ops_.set_output_tag(tag); 564 // TODO(ctiller): don't assert 565 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 566 call_.PerformOps(&write_ops_); 567 } 568 569 void Write(const W& msg, WriteOptions options, void* tag) override { 570 assert(started_); 571 write_ops_.set_output_tag(tag); 572 if (options.is_last_message()) { 573 options.set_buffer_hint(); 574 write_ops_.ClientSendClose(); 575 } 576 // TODO(ctiller): don't assert 577 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 578 call_.PerformOps(&write_ops_); 579 } 580 581 void WritesDone(void* tag) override { 582 assert(started_); 583 write_ops_.set_output_tag(tag); 584 write_ops_.ClientSendClose(); 585 call_.PerformOps(&write_ops_); 586 } 587 588 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 589 /// Side effect 590 /// - the \a ClientContext associated with this call is updated with 591 /// possible initial and trailing metadata sent from the server. 592 void Finish(Status* status, void* tag) override { 593 assert(started_); 594 finish_ops_.set_output_tag(tag); 595 if (!context_->initial_metadata_received_) { 596 finish_ops_.RecvInitialMetadata(context_); 597 } 598 finish_ops_.ClientRecvStatus(context_, status); 599 call_.PerformOps(&finish_ops_); 600 } 601 602 private: 603 friend class internal::ClientAsyncReaderWriterFactory<W, R>; 604 ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, 605 bool start, void* tag) 606 : context_(context), call_(call), started_(start) { 607 if (start) { 608 StartCallInternal(tag); 609 } else { 610 assert(tag == nullptr); 611 } 612 } 613 614 void StartCallInternal(void* tag) { 615 write_ops_.SendInitialMetadata(context_->send_initial_metadata_, 616 context_->initial_metadata_flags()); 617 // if corked bit is set in context, we just keep the initial metadata 618 // buffered up to coalesce with later message send. No op is performed. 619 if (!context_->initial_metadata_corked_) { 620 write_ops_.set_output_tag(tag); 621 call_.PerformOps(&write_ops_); 622 } 623 } 624 625 ClientContext* context_; 626 ::grpc::internal::Call call_; 627 bool started_; 628 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 629 meta_ops_; 630 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 631 ::grpc::internal::CallOpRecvMessage<R>> 632 read_ops_; 633 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 634 ::grpc::internal::CallOpSendMessage, 635 ::grpc::internal::CallOpClientSendClose> 636 write_ops_; 637 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 638 ::grpc::internal::CallOpClientRecvStatus> 639 finish_ops_; 640 }; 641 642 template <class W, class R> 643 class ServerAsyncReaderInterface 644 : public internal::ServerAsyncStreamingInterface, 645 public internal::AsyncReaderInterface<R> { 646 public: 647 /// Indicate that the stream is to be finished with a certain status code 648 /// and also send out \a msg response to the client. 649 /// Request notification for when the server has sent the response and the 650 /// appropriate signals to the client to end the call. 651 /// Should not be used concurrently with other operations. 652 /// 653 /// It is appropriate to call this method when: 654 /// * all messages from the client have been received (either known 655 /// implictly, or explicitly because a previous 656 /// \a AsyncReaderInterface::Read operation with a non-ok result, 657 /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 658 /// 659 /// This operation will end when the server has finished sending out initial 660 /// metadata (if not sent already), response message, and status, or if 661 /// some failure occurred when trying to do so. 662 /// 663 /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it 664 /// is safe to to deallocate once Finish returns. 665 /// 666 /// \param[in] tag Tag identifying this request. 667 /// \param[in] status To be sent to the client as the result of this call. 668 /// \param[in] msg To be sent to the client as the response for this call. 669 virtual void Finish(const W& msg, const Status& status, void* tag) = 0; 670 671 /// Indicate that the stream is to be finished with a certain 672 /// non-OK status code. 673 /// Request notification for when the server has sent the appropriate 674 /// signals to the client to end the call. 675 /// Should not be used concurrently with other operations. 676 /// 677 /// This call is meant to end the call with some error, and can be called at 678 /// any point that the server would like to "fail" the call (though note 679 /// this shouldn't be called concurrently with any other "sending" call, like 680 /// \a AsyncWriterInterface::Write). 681 /// 682 /// This operation will end when the server has finished sending out initial 683 /// metadata (if not sent already), and status, or if some failure occurred 684 /// when trying to do so. 685 /// 686 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 687 /// to deallocate once FinishWithError returns. 688 /// 689 /// \param[in] tag Tag identifying this request. 690 /// \param[in] status To be sent to the client as the result of this call. 691 /// - Note: \a status must have a non-OK code. 692 virtual void FinishWithError(const Status& status, void* tag) = 0; 693 }; 694 695 /// Async server-side API for doing client-streaming RPCs, 696 /// where the incoming message stream from the client has messages of type \a R, 697 /// and the single response message sent from the server is type \a W. 698 template <class W, class R> 699 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { 700 public: 701 explicit ServerAsyncReader(ServerContext* ctx) 702 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 703 704 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 705 /// 706 /// Implicit input parameter: 707 /// - The initial metadata that will be sent to the client from this op will 708 /// be taken from the \a ServerContext associated with the call. 709 void SendInitialMetadata(void* tag) override { 710 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 711 712 meta_ops_.set_output_tag(tag); 713 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, 714 ctx_->initial_metadata_flags()); 715 if (ctx_->compression_level_set()) { 716 meta_ops_.set_compression_level(ctx_->compression_level()); 717 } 718 ctx_->sent_initial_metadata_ = true; 719 call_.PerformOps(&meta_ops_); 720 } 721 722 void Read(R* msg, void* tag) override { 723 read_ops_.set_output_tag(tag); 724 read_ops_.RecvMessage(msg); 725 call_.PerformOps(&read_ops_); 726 } 727 728 /// See the \a ServerAsyncReaderInterface.Read method for semantics 729 /// 730 /// Side effect: 731 /// - also sends initial metadata if not alreay sent. 732 /// - uses the \a ServerContext associated with this call to send possible 733 /// initial and trailing metadata. 734 /// 735 /// Note: \a msg is not sent if \a status has a non-OK code. 736 /// 737 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 738 /// is safe to to deallocate once Finish returns. 739 void Finish(const W& msg, const Status& status, void* tag) override { 740 finish_ops_.set_output_tag(tag); 741 if (!ctx_->sent_initial_metadata_) { 742 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, 743 ctx_->initial_metadata_flags()); 744 if (ctx_->compression_level_set()) { 745 finish_ops_.set_compression_level(ctx_->compression_level()); 746 } 747 ctx_->sent_initial_metadata_ = true; 748 } 749 // The response is dropped if the status is not OK. 750 if (status.ok()) { 751 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, 752 finish_ops_.SendMessage(msg)); 753 } else { 754 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); 755 } 756 call_.PerformOps(&finish_ops_); 757 } 758 759 /// See the \a ServerAsyncReaderInterface.Read method for semantics 760 /// 761 /// Side effect: 762 /// - also sends initial metadata if not alreay sent. 763 /// - uses the \a ServerContext associated with this call to send possible 764 /// initial and trailing metadata. 765 /// 766 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 767 /// to deallocate once FinishWithError returns. 768 void FinishWithError(const Status& status, void* tag) override { 769 GPR_CODEGEN_ASSERT(!status.ok()); 770 finish_ops_.set_output_tag(tag); 771 if (!ctx_->sent_initial_metadata_) { 772 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, 773 ctx_->initial_metadata_flags()); 774 if (ctx_->compression_level_set()) { 775 finish_ops_.set_compression_level(ctx_->compression_level()); 776 } 777 ctx_->sent_initial_metadata_ = true; 778 } 779 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); 780 call_.PerformOps(&finish_ops_); 781 } 782 783 private: 784 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 785 786 ::grpc::internal::Call call_; 787 ServerContext* ctx_; 788 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 789 meta_ops_; 790 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; 791 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 792 ::grpc::internal::CallOpSendMessage, 793 ::grpc::internal::CallOpServerSendStatus> 794 finish_ops_; 795 }; 796 797 template <class W> 798 class ServerAsyncWriterInterface 799 : public internal::ServerAsyncStreamingInterface, 800 public internal::AsyncWriterInterface<W> { 801 public: 802 /// Indicate that the stream is to be finished with a certain status code. 803 /// Request notification for when the server has sent the appropriate 804 /// signals to the client to end the call. 805 /// Should not be used concurrently with other operations. 806 /// 807 /// It is appropriate to call this method when either: 808 /// * all messages from the client have been received (either known 809 /// implictly, or explicitly because a previous \a 810 /// AsyncReaderInterface::Read operation with a non-ok 811 /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. 812 /// * it is desired to end the call early with some non-OK status code. 813 /// 814 /// This operation will end when the server has finished sending out initial 815 /// metadata (if not sent already), response message, and status, or if 816 /// some failure occurred when trying to do so. 817 /// 818 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 819 /// to deallocate once Finish returns. 820 /// 821 /// \param[in] tag Tag identifying this request. 822 /// \param[in] status To be sent to the client as the result of this call. 823 virtual void Finish(const Status& status, void* tag) = 0; 824 825 /// Request the writing of \a msg and coalesce it with trailing metadata which 826 /// contains \a status, using WriteOptions options with 827 /// identifying tag \a tag. 828 /// 829 /// WriteAndFinish is equivalent of performing WriteLast and Finish 830 /// in a single step. 831 /// 832 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 833 /// is safe to to deallocate once WriteAndFinish returns. 834 /// 835 /// \param[in] msg The message to be written. 836 /// \param[in] options The WriteOptions to be used to write this message. 837 /// \param[in] status The Status that server returns to client. 838 /// \param[in] tag The tag identifying the operation. 839 virtual void WriteAndFinish(const W& msg, WriteOptions options, 840 const Status& status, void* tag) = 0; 841 }; 842 843 /// Async server-side API for doing server streaming RPCs, 844 /// where the outgoing message stream from the server has messages of type \a W. 845 template <class W> 846 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { 847 public: 848 explicit ServerAsyncWriter(ServerContext* ctx) 849 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 850 851 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 852 /// 853 /// Implicit input parameter: 854 /// - The initial metadata that will be sent to the client from this op will 855 /// be taken from the \a ServerContext associated with the call. 856 /// 857 /// \param[in] tag Tag identifying this request. 858 void SendInitialMetadata(void* tag) override { 859 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 860 861 meta_ops_.set_output_tag(tag); 862 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, 863 ctx_->initial_metadata_flags()); 864 if (ctx_->compression_level_set()) { 865 meta_ops_.set_compression_level(ctx_->compression_level()); 866 } 867 ctx_->sent_initial_metadata_ = true; 868 call_.PerformOps(&meta_ops_); 869 } 870 871 void Write(const W& msg, void* tag) override { 872 write_ops_.set_output_tag(tag); 873 EnsureInitialMetadataSent(&write_ops_); 874 // TODO(ctiller): don't assert 875 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 876 call_.PerformOps(&write_ops_); 877 } 878 879 void Write(const W& msg, WriteOptions options, void* tag) override { 880 write_ops_.set_output_tag(tag); 881 if (options.is_last_message()) { 882 options.set_buffer_hint(); 883 } 884 885 EnsureInitialMetadataSent(&write_ops_); 886 // TODO(ctiller): don't assert 887 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 888 call_.PerformOps(&write_ops_); 889 } 890 891 /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. 892 /// 893 /// Implicit input parameter: 894 /// - the \a ServerContext associated with this call is used 895 /// for sending trailing (and initial) metadata to the client. 896 /// 897 /// Note: \a status must have an OK code. 898 /// 899 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 900 /// is safe to to deallocate once WriteAndFinish returns. 901 void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, 902 void* tag) override { 903 write_ops_.set_output_tag(tag); 904 EnsureInitialMetadataSent(&write_ops_); 905 options.set_buffer_hint(); 906 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 907 write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); 908 call_.PerformOps(&write_ops_); 909 } 910 911 /// See the \a ServerAsyncWriterInterface.Finish method for semantics. 912 /// 913 /// Implicit input parameter: 914 /// - the \a ServerContext associated with this call is used for sending 915 /// trailing (and initial if not already sent) metadata to the client. 916 /// 917 /// Note: there are no restrictions are the code of 918 /// \a status,it may be non-OK 919 /// 920 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 921 /// to deallocate once Finish returns. 922 void Finish(const Status& status, void* tag) override { 923 finish_ops_.set_output_tag(tag); 924 EnsureInitialMetadataSent(&finish_ops_); 925 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); 926 call_.PerformOps(&finish_ops_); 927 } 928 929 private: 930 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 931 932 template <class T> 933 void EnsureInitialMetadataSent(T* ops) { 934 if (!ctx_->sent_initial_metadata_) { 935 ops->SendInitialMetadata(ctx_->initial_metadata_, 936 ctx_->initial_metadata_flags()); 937 if (ctx_->compression_level_set()) { 938 ops->set_compression_level(ctx_->compression_level()); 939 } 940 ctx_->sent_initial_metadata_ = true; 941 } 942 } 943 944 ::grpc::internal::Call call_; 945 ServerContext* ctx_; 946 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 947 meta_ops_; 948 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 949 ::grpc::internal::CallOpSendMessage, 950 ::grpc::internal::CallOpServerSendStatus> 951 write_ops_; 952 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 953 ::grpc::internal::CallOpServerSendStatus> 954 finish_ops_; 955 }; 956 957 /// Server-side interface for asynchronous bi-directional streaming. 958 template <class W, class R> 959 class ServerAsyncReaderWriterInterface 960 : public internal::ServerAsyncStreamingInterface, 961 public internal::AsyncWriterInterface<W>, 962 public internal::AsyncReaderInterface<R> { 963 public: 964 /// Indicate that the stream is to be finished with a certain status code. 965 /// Request notification for when the server has sent the appropriate 966 /// signals to the client to end the call. 967 /// Should not be used concurrently with other operations. 968 /// 969 /// It is appropriate to call this method when either: 970 /// * all messages from the client have been received (either known 971 /// implictly, or explicitly because a previous \a 972 /// AsyncReaderInterface::Read operation 973 /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' 974 /// with 'false'. 975 /// * it is desired to end the call early with some non-OK status code. 976 /// 977 /// This operation will end when the server has finished sending out initial 978 /// metadata (if not sent already), response message, and status, or if some 979 /// failure occurred when trying to do so. 980 /// 981 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 982 /// to deallocate once Finish returns. 983 /// 984 /// \param[in] tag Tag identifying this request. 985 /// \param[in] status To be sent to the client as the result of this call. 986 virtual void Finish(const Status& status, void* tag) = 0; 987 988 /// Request the writing of \a msg and coalesce it with trailing metadata which 989 /// contains \a status, using WriteOptions options with 990 /// identifying tag \a tag. 991 /// 992 /// WriteAndFinish is equivalent of performing WriteLast and Finish in a 993 /// single step. 994 /// 995 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 996 /// is safe to to deallocate once WriteAndFinish returns. 997 /// 998 /// \param[in] msg The message to be written. 999 /// \param[in] options The WriteOptions to be used to write this message. 1000 /// \param[in] status The Status that server returns to client. 1001 /// \param[in] tag The tag identifying the operation. 1002 virtual void WriteAndFinish(const W& msg, WriteOptions options, 1003 const Status& status, void* tag) = 0; 1004 }; 1005 1006 /// Async server-side API for doing bidirectional streaming RPCs, 1007 /// where the incoming message stream coming from the client has messages of 1008 /// type \a R, and the outgoing message stream coming from the server has 1009 /// messages of type \a W. 1010 template <class W, class R> 1011 class ServerAsyncReaderWriter final 1012 : public ServerAsyncReaderWriterInterface<W, R> { 1013 public: 1014 explicit ServerAsyncReaderWriter(ServerContext* ctx) 1015 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 1016 1017 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 1018 /// 1019 /// Implicit input parameter: 1020 /// - The initial metadata that will be sent to the client from this op will 1021 /// be taken from the \a ServerContext associated with the call. 1022 /// 1023 /// \param[in] tag Tag identifying this request. 1024 void SendInitialMetadata(void* tag) override { 1025 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 1026 1027 meta_ops_.set_output_tag(tag); 1028 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, 1029 ctx_->initial_metadata_flags()); 1030 if (ctx_->compression_level_set()) { 1031 meta_ops_.set_compression_level(ctx_->compression_level()); 1032 } 1033 ctx_->sent_initial_metadata_ = true; 1034 call_.PerformOps(&meta_ops_); 1035 } 1036 1037 void Read(R* msg, void* tag) override { 1038 read_ops_.set_output_tag(tag); 1039 read_ops_.RecvMessage(msg); 1040 call_.PerformOps(&read_ops_); 1041 } 1042 1043 void Write(const W& msg, void* tag) override { 1044 write_ops_.set_output_tag(tag); 1045 EnsureInitialMetadataSent(&write_ops_); 1046 // TODO(ctiller): don't assert 1047 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 1048 call_.PerformOps(&write_ops_); 1049 } 1050 1051 void Write(const W& msg, WriteOptions options, void* tag) override { 1052 write_ops_.set_output_tag(tag); 1053 if (options.is_last_message()) { 1054 options.set_buffer_hint(); 1055 } 1056 EnsureInitialMetadataSent(&write_ops_); 1057 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1058 call_.PerformOps(&write_ops_); 1059 } 1060 1061 /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish 1062 /// method for semantics. 1063 /// 1064 /// Implicit input parameter: 1065 /// - the \a ServerContext associated with this call is used 1066 /// for sending trailing (and initial) metadata to the client. 1067 /// 1068 /// Note: \a status must have an OK code. 1069 // 1070 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 1071 /// is safe to to deallocate once WriteAndFinish returns. 1072 void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, 1073 void* tag) override { 1074 write_ops_.set_output_tag(tag); 1075 EnsureInitialMetadataSent(&write_ops_); 1076 options.set_buffer_hint(); 1077 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1078 write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); 1079 call_.PerformOps(&write_ops_); 1080 } 1081 1082 /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. 1083 /// 1084 /// Implicit input parameter: 1085 /// - the \a ServerContext associated with this call is used for sending 1086 /// trailing (and initial if not already sent) metadata to the client. 1087 /// 1088 /// Note: there are no restrictions are the code of \a status, 1089 /// it may be non-OK 1090 // 1091 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 1092 /// to deallocate once Finish returns. 1093 void Finish(const Status& status, void* tag) override { 1094 finish_ops_.set_output_tag(tag); 1095 EnsureInitialMetadataSent(&finish_ops_); 1096 1097 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); 1098 call_.PerformOps(&finish_ops_); 1099 } 1100 1101 private: 1102 friend class ::grpc::Server; 1103 1104 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 1105 1106 template <class T> 1107 void EnsureInitialMetadataSent(T* ops) { 1108 if (!ctx_->sent_initial_metadata_) { 1109 ops->SendInitialMetadata(ctx_->initial_metadata_, 1110 ctx_->initial_metadata_flags()); 1111 if (ctx_->compression_level_set()) { 1112 ops->set_compression_level(ctx_->compression_level()); 1113 } 1114 ctx_->sent_initial_metadata_ = true; 1115 } 1116 } 1117 1118 ::grpc::internal::Call call_; 1119 ServerContext* ctx_; 1120 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 1121 meta_ops_; 1122 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; 1123 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 1124 ::grpc::internal::CallOpSendMessage, 1125 ::grpc::internal::CallOpServerSendStatus> 1126 write_ops_; 1127 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 1128 ::grpc::internal::CallOpServerSendStatus> 1129 finish_ops_; 1130 }; 1131 1132 } // namespace grpc 1133 1134 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 1135