1 /* 2 * Copyright 2015 gRPC authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 #include <grpcpp/server.h> 19 20 #include <cstdlib> 21 #include <sstream> 22 #include <utility> 23 24 #include <grpc/grpc.h> 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 #include <grpcpp/completion_queue.h> 28 #include <grpcpp/generic/async_generic_service.h> 29 #include <grpcpp/impl/codegen/async_unary_call.h> 30 #include <grpcpp/impl/codegen/completion_queue_tag.h> 31 #include <grpcpp/impl/grpc_library.h> 32 #include <grpcpp/impl/method_handler_impl.h> 33 #include <grpcpp/impl/rpc_service_method.h> 34 #include <grpcpp/impl/server_initializer.h> 35 #include <grpcpp/impl/service_type.h> 36 #include <grpcpp/security/server_credentials.h> 37 #include <grpcpp/server_context.h> 38 #include <grpcpp/support/time.h> 39 40 #include "src/core/ext/transport/inproc/inproc_transport.h" 41 #include "src/core/lib/profiling/timers.h" 42 #include "src/core/lib/surface/call.h" 43 #include "src/cpp/client/create_channel_internal.h" 44 #include "src/cpp/server/health/default_health_check_service.h" 45 #include "src/cpp/thread_manager/thread_manager.h" 46 47 namespace grpc { 48 namespace { 49 50 // The default value for maximum number of threads that can be created in the 51 // sync server. This value of INT_MAX is chosen to match the default behavior if 52 // no ResourceQuota is set. To modify the max number of threads in a sync 53 // server, pass a custom ResourceQuota object (with the desired number of 54 // max-threads set) to the server builder. 55 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX 56 57 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { 58 public: 59 ~DefaultGlobalCallbacks() override {} 60 void PreSynchronousRequest(ServerContext* context) override {} 61 void PostSynchronousRequest(ServerContext* context) override {} 62 }; 63 64 std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; 65 gpr_once g_once_init_callbacks = GPR_ONCE_INIT; 66 67 void InitGlobalCallbacks() { 68 if (!g_callbacks) { 69 g_callbacks.reset(new DefaultGlobalCallbacks()); 70 } 71 } 72 73 class ShutdownTag : public internal::CompletionQueueTag { 74 public: 75 bool FinalizeResult(void** tag, bool* status) { return false; } 76 }; 77 78 class DummyTag : public internal::CompletionQueueTag { 79 public: 80 bool FinalizeResult(void** tag, bool* status) { 81 *status = true; 82 return true; 83 } 84 }; 85 86 class UnimplementedAsyncRequestContext { 87 protected: 88 UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} 89 90 GenericServerContext server_context_; 91 GenericServerAsyncReaderWriter generic_stream_; 92 }; 93 94 } // namespace 95 96 /// Use private inheritance rather than composition only to establish order 97 /// of construction, since the public base class should be constructed after the 98 /// elements belonging to the private base class are constructed. This is not 99 /// possible using true composition. 100 class Server::UnimplementedAsyncRequest final 101 : private UnimplementedAsyncRequestContext, 102 public GenericAsyncRequest { 103 public: 104 UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) 105 : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, 106 nullptr, false), 107 server_(server), 108 cq_(cq) {} 109 110 bool FinalizeResult(void** tag, bool* status) override; 111 112 ServerContext* context() { return &server_context_; } 113 GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } 114 115 private: 116 Server* const server_; 117 ServerCompletionQueue* const cq_; 118 }; 119 120 /// UnimplementedAsyncResponse should not post user-visible completions to the 121 /// C++ completion queue, but is generated as a CQ event by the core 122 class Server::UnimplementedAsyncResponse final 123 : public internal::CallOpSet<internal::CallOpSendInitialMetadata, 124 internal::CallOpServerSendStatus> { 125 public: 126 UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); 127 ~UnimplementedAsyncResponse() { delete request_; } 128 129 bool FinalizeResult(void** tag, bool* status) override { 130 internal::CallOpSet< 131 internal::CallOpSendInitialMetadata, 132 internal::CallOpServerSendStatus>::FinalizeResult(tag, status); 133 delete this; 134 return false; 135 } 136 137 private: 138 UnimplementedAsyncRequest* const request_; 139 }; 140 141 class Server::SyncRequest final : public internal::CompletionQueueTag { 142 public: 143 SyncRequest(internal::RpcServiceMethod* method, void* tag) 144 : method_(method), 145 tag_(tag), 146 in_flight_(false), 147 has_request_payload_( 148 method->method_type() == internal::RpcMethod::NORMAL_RPC || 149 method->method_type() == internal::RpcMethod::SERVER_STREAMING), 150 call_details_(nullptr), 151 cq_(nullptr) { 152 grpc_metadata_array_init(&request_metadata_); 153 } 154 155 ~SyncRequest() { 156 if (call_details_) { 157 delete call_details_; 158 } 159 grpc_metadata_array_destroy(&request_metadata_); 160 } 161 162 void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); } 163 164 void TeardownRequest() { 165 grpc_completion_queue_destroy(cq_); 166 cq_ = nullptr; 167 } 168 169 void Request(grpc_server* server, grpc_completion_queue* notify_cq) { 170 GPR_ASSERT(cq_ && !in_flight_); 171 in_flight_ = true; 172 if (tag_) { 173 if (GRPC_CALL_OK != 174 grpc_server_request_registered_call( 175 server, tag_, &call_, &deadline_, &request_metadata_, 176 has_request_payload_ ? &request_payload_ : nullptr, cq_, 177 notify_cq, this)) { 178 TeardownRequest(); 179 return; 180 } 181 } else { 182 if (!call_details_) { 183 call_details_ = new grpc_call_details; 184 grpc_call_details_init(call_details_); 185 } 186 if (grpc_server_request_call(server, &call_, call_details_, 187 &request_metadata_, cq_, notify_cq, 188 this) != GRPC_CALL_OK) { 189 TeardownRequest(); 190 return; 191 } 192 } 193 } 194 195 bool FinalizeResult(void** tag, bool* status) override { 196 if (!*status) { 197 grpc_completion_queue_destroy(cq_); 198 } 199 if (call_details_) { 200 deadline_ = call_details_->deadline; 201 grpc_call_details_destroy(call_details_); 202 grpc_call_details_init(call_details_); 203 } 204 return true; 205 } 206 207 class CallData final { 208 public: 209 explicit CallData(Server* server, SyncRequest* mrd) 210 : cq_(mrd->cq_), 211 call_(mrd->call_, server, &cq_, server->max_receive_message_size()), 212 ctx_(mrd->deadline_, &mrd->request_metadata_), 213 has_request_payload_(mrd->has_request_payload_), 214 request_payload_(has_request_payload_ ? mrd->request_payload_ 215 : nullptr), 216 method_(mrd->method_), 217 server_(server) { 218 ctx_.set_call(mrd->call_); 219 ctx_.cq_ = &cq_; 220 GPR_ASSERT(mrd->in_flight_); 221 mrd->in_flight_ = false; 222 mrd->request_metadata_.count = 0; 223 } 224 225 ~CallData() { 226 if (has_request_payload_ && request_payload_) { 227 grpc_byte_buffer_destroy(request_payload_); 228 } 229 } 230 231 void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, 232 bool resources) { 233 ctx_.BeginCompletionOp(&call_); 234 global_callbacks->PreSynchronousRequest(&ctx_); 235 auto* handler = resources ? method_->handler() 236 : server_->resource_exhausted_handler_.get(); 237 handler->RunHandler(internal::MethodHandler::HandlerParameter( 238 &call_, &ctx_, request_payload_)); 239 global_callbacks->PostSynchronousRequest(&ctx_); 240 request_payload_ = nullptr; 241 242 cq_.Shutdown(); 243 244 internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); 245 cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); 246 247 /* Ensure the cq_ is shutdown */ 248 DummyTag ignored_tag; 249 GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); 250 } 251 252 private: 253 CompletionQueue cq_; 254 internal::Call call_; 255 ServerContext ctx_; 256 const bool has_request_payload_; 257 grpc_byte_buffer* request_payload_; 258 internal::RpcServiceMethod* const method_; 259 Server* server_; 260 }; 261 262 private: 263 internal::RpcServiceMethod* const method_; 264 void* const tag_; 265 bool in_flight_; 266 const bool has_request_payload_; 267 grpc_call* call_; 268 grpc_call_details* call_details_; 269 gpr_timespec deadline_; 270 grpc_metadata_array request_metadata_; 271 grpc_byte_buffer* request_payload_; 272 grpc_completion_queue* cq_; 273 }; 274 275 // Implementation of ThreadManager. Each instance of SyncRequestThreadManager 276 // manages a pool of threads that poll for incoming Sync RPCs and call the 277 // appropriate RPC handlers 278 class Server::SyncRequestThreadManager : public ThreadManager { 279 public: 280 SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, 281 std::shared_ptr<GlobalCallbacks> global_callbacks, 282 grpc_resource_quota* rq, int min_pollers, 283 int max_pollers, int cq_timeout_msec) 284 : ThreadManager("SyncServer", rq, min_pollers, max_pollers), 285 server_(server), 286 server_cq_(server_cq), 287 cq_timeout_msec_(cq_timeout_msec), 288 global_callbacks_(std::move(global_callbacks)) {} 289 290 WorkStatus PollForWork(void** tag, bool* ok) override { 291 *tag = nullptr; 292 // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working 293 // right now 294 gpr_timespec deadline = 295 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), 296 gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN)); 297 298 switch (server_cq_->AsyncNext(tag, ok, deadline)) { 299 case CompletionQueue::TIMEOUT: 300 return TIMEOUT; 301 case CompletionQueue::SHUTDOWN: 302 return SHUTDOWN; 303 case CompletionQueue::GOT_EVENT: 304 return WORK_FOUND; 305 } 306 307 GPR_UNREACHABLE_CODE(return TIMEOUT); 308 } 309 310 void DoWork(void* tag, bool ok, bool resources) override { 311 SyncRequest* sync_req = static_cast<SyncRequest*>(tag); 312 313 if (!sync_req) { 314 // No tag. Nothing to work on. This is an unlikley scenario and possibly a 315 // bug in RPC Manager implementation. 316 gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag"); 317 return; 318 } 319 320 if (ok) { 321 // Calldata takes ownership of the completion queue inside sync_req 322 SyncRequest::CallData cd(server_, sync_req); 323 // Prepare for the next request 324 if (!IsShutdown()) { 325 sync_req->SetupRequest(); // Create new completion queue for sync_req 326 sync_req->Request(server_->c_server(), server_cq_->cq()); 327 } 328 329 GPR_TIMER_SCOPE("cd.Run()", 0); 330 cd.Run(global_callbacks_, resources); 331 } 332 // TODO (sreek) If ok is false here (which it isn't in case of 333 // grpc_request_registered_call), we should still re-queue the request 334 // object 335 } 336 337 void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { 338 sync_requests_.emplace_back(new SyncRequest(method, tag)); 339 } 340 341 void AddUnknownSyncMethod() { 342 if (!sync_requests_.empty()) { 343 unknown_method_.reset(new internal::RpcServiceMethod( 344 "unknown", internal::RpcMethod::BIDI_STREAMING, 345 new internal::UnknownMethodHandler)); 346 sync_requests_.emplace_back( 347 new SyncRequest(unknown_method_.get(), nullptr)); 348 } 349 } 350 351 void Shutdown() override { 352 ThreadManager::Shutdown(); 353 server_cq_->Shutdown(); 354 } 355 356 void Wait() override { 357 ThreadManager::Wait(); 358 // Drain any pending items from the queue 359 void* tag; 360 bool ok; 361 while (server_cq_->Next(&tag, &ok)) { 362 // Do nothing 363 } 364 } 365 366 void Start() { 367 if (!sync_requests_.empty()) { 368 for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) { 369 (*m)->SetupRequest(); 370 (*m)->Request(server_->c_server(), server_cq_->cq()); 371 } 372 373 Initialize(); // ThreadManager's Initialize() 374 } 375 } 376 377 private: 378 Server* server_; 379 CompletionQueue* server_cq_; 380 int cq_timeout_msec_; 381 std::vector<std::unique_ptr<SyncRequest>> sync_requests_; 382 std::unique_ptr<internal::RpcServiceMethod> unknown_method_; 383 std::unique_ptr<internal::RpcServiceMethod> health_check_; 384 std::shared_ptr<Server::GlobalCallbacks> global_callbacks_; 385 }; 386 387 static internal::GrpcLibraryInitializer g_gli_initializer; 388 Server::Server( 389 int max_receive_message_size, ChannelArguments* args, 390 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 391 sync_server_cqs, 392 int min_pollers, int max_pollers, int sync_cq_timeout_msec, 393 grpc_resource_quota* server_rq) 394 : max_receive_message_size_(max_receive_message_size), 395 sync_server_cqs_(std::move(sync_server_cqs)), 396 started_(false), 397 shutdown_(false), 398 shutdown_notified_(false), 399 has_generic_service_(false), 400 server_(nullptr), 401 server_initializer_(new ServerInitializer(this)), 402 health_check_service_disabled_(false) { 403 g_gli_initializer.summon(); 404 gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); 405 global_callbacks_ = g_callbacks; 406 global_callbacks_->UpdateArguments(args); 407 408 if (sync_server_cqs_ != nullptr) { 409 bool default_rq_created = false; 410 if (server_rq == nullptr) { 411 server_rq = grpc_resource_quota_create("SyncServer-default-rq"); 412 grpc_resource_quota_set_max_threads(server_rq, 413 DEFAULT_MAX_SYNC_SERVER_THREADS); 414 default_rq_created = true; 415 } 416 417 for (const auto& it : *sync_server_cqs_) { 418 sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( 419 this, it.get(), global_callbacks_, server_rq, min_pollers, 420 max_pollers, sync_cq_timeout_msec)); 421 } 422 423 if (default_rq_created) { 424 grpc_resource_quota_unref(server_rq); 425 } 426 } 427 428 grpc_channel_args channel_args; 429 args->SetChannelArgs(&channel_args); 430 431 for (size_t i = 0; i < channel_args.num_args; i++) { 432 if (0 == 433 strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) { 434 if (channel_args.args[i].value.pointer.p == nullptr) { 435 health_check_service_disabled_ = true; 436 } else { 437 health_check_service_.reset(static_cast<HealthCheckServiceInterface*>( 438 channel_args.args[i].value.pointer.p)); 439 } 440 break; 441 } 442 } 443 444 server_ = grpc_server_create(&channel_args, nullptr); 445 } 446 447 Server::~Server() { 448 { 449 std::unique_lock<std::mutex> lock(mu_); 450 if (started_ && !shutdown_) { 451 lock.unlock(); 452 Shutdown(); 453 } else if (!started_) { 454 // Shutdown the completion queues 455 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { 456 (*it)->Shutdown(); 457 } 458 } 459 } 460 461 grpc_server_destroy(server_); 462 } 463 464 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { 465 GPR_ASSERT(!g_callbacks); 466 GPR_ASSERT(callbacks); 467 g_callbacks.reset(callbacks); 468 } 469 470 grpc_server* Server::c_server() { return server_; } 471 472 std::shared_ptr<Channel> Server::InProcessChannel( 473 const ChannelArguments& args) { 474 grpc_channel_args channel_args = args.c_channel_args(); 475 return CreateChannelInternal( 476 "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr)); 477 } 478 479 static grpc_server_register_method_payload_handling PayloadHandlingForMethod( 480 internal::RpcServiceMethod* method) { 481 switch (method->method_type()) { 482 case internal::RpcMethod::NORMAL_RPC: 483 case internal::RpcMethod::SERVER_STREAMING: 484 return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; 485 case internal::RpcMethod::CLIENT_STREAMING: 486 case internal::RpcMethod::BIDI_STREAMING: 487 return GRPC_SRM_PAYLOAD_NONE; 488 } 489 GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); 490 } 491 492 bool Server::RegisterService(const grpc::string* host, Service* service) { 493 bool has_async_methods = service->has_async_methods(); 494 if (has_async_methods) { 495 GPR_ASSERT(service->server_ == nullptr && 496 "Can only register an asynchronous service against one server."); 497 service->server_ = this; 498 } 499 500 const char* method_name = nullptr; 501 for (auto it = service->methods_.begin(); it != service->methods_.end(); 502 ++it) { 503 if (it->get() == nullptr) { // Handled by generic service if any. 504 continue; 505 } 506 507 internal::RpcServiceMethod* method = it->get(); 508 void* tag = grpc_server_register_method( 509 server_, method->name(), host ? host->c_str() : nullptr, 510 PayloadHandlingForMethod(method), 0); 511 if (tag == nullptr) { 512 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", 513 method->name()); 514 return false; 515 } 516 517 if (method->handler() == nullptr) { // Async method 518 method->set_server_tag(tag); 519 } else { 520 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { 521 (*it)->AddSyncMethod(method, tag); 522 } 523 } 524 525 method_name = method->name(); 526 } 527 528 // Parse service name. 529 if (method_name != nullptr) { 530 std::stringstream ss(method_name); 531 grpc::string service_name; 532 if (std::getline(ss, service_name, '/') && 533 std::getline(ss, service_name, '/')) { 534 services_.push_back(service_name); 535 } 536 } 537 return true; 538 } 539 540 void Server::RegisterAsyncGenericService(AsyncGenericService* service) { 541 GPR_ASSERT(service->server_ == nullptr && 542 "Can only register an async generic service against one server."); 543 service->server_ = this; 544 has_generic_service_ = true; 545 } 546 547 int Server::AddListeningPort(const grpc::string& addr, 548 ServerCredentials* creds) { 549 GPR_ASSERT(!started_); 550 int port = creds->AddPortToServer(addr, server_); 551 global_callbacks_->AddPort(this, addr, creds, port); 552 return port; 553 } 554 555 void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { 556 GPR_ASSERT(!started_); 557 global_callbacks_->PreServerStart(this); 558 started_ = true; 559 560 // Only create default health check service when user did not provide an 561 // explicit one. 562 if (health_check_service_ == nullptr && !health_check_service_disabled_ && 563 DefaultHealthCheckServiceEnabled()) { 564 if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) { 565 gpr_log(GPR_INFO, 566 "Default health check service disabled at async-only server."); 567 } else { 568 auto* default_hc_service = new DefaultHealthCheckService; 569 health_check_service_.reset(default_hc_service); 570 RegisterService(nullptr, default_hc_service->GetHealthCheckService()); 571 } 572 } 573 574 grpc_server_start(server_); 575 576 if (!has_generic_service_) { 577 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { 578 (*it)->AddUnknownSyncMethod(); 579 } 580 581 for (size_t i = 0; i < num_cqs; i++) { 582 if (cqs[i]->IsFrequentlyPolled()) { 583 new UnimplementedAsyncRequest(this, cqs[i]); 584 } 585 } 586 } 587 588 // If this server has any support for synchronous methods (has any sync 589 // server CQs), make sure that we have a ResourceExhausted handler 590 // to deal with the case of thread exhaustion 591 if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { 592 resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); 593 } 594 595 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { 596 (*it)->Start(); 597 } 598 } 599 600 void Server::ShutdownInternal(gpr_timespec deadline) { 601 std::unique_lock<std::mutex> lock(mu_); 602 if (!shutdown_) { 603 shutdown_ = true; 604 605 /// The completion queue to use for server shutdown completion notification 606 CompletionQueue shutdown_cq; 607 ShutdownTag shutdown_tag; // Dummy shutdown tag 608 grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); 609 610 shutdown_cq.Shutdown(); 611 612 void* tag; 613 bool ok; 614 CompletionQueue::NextStatus status = 615 shutdown_cq.AsyncNext(&tag, &ok, deadline); 616 617 // If this timed out, it means we are done with the grace period for a clean 618 // shutdown. We should force a shutdown now by cancelling all inflight calls 619 if (status == CompletionQueue::NextStatus::TIMEOUT) { 620 grpc_server_cancel_all_calls(server_); 621 } 622 // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has 623 // successfully shutdown 624 625 // Shutdown all ThreadManagers. This will try to gracefully stop all the 626 // threads in the ThreadManagers (once they process any inflight requests) 627 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { 628 (*it)->Shutdown(); // ThreadManager's Shutdown() 629 } 630 631 // Wait for threads in all ThreadManagers to terminate 632 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { 633 (*it)->Wait(); 634 } 635 636 // Drain the shutdown queue (if the previous call to AsyncNext() timed out 637 // and we didn't remove the tag from the queue yet) 638 while (shutdown_cq.Next(&tag, &ok)) { 639 // Nothing to be done here. Just ignore ok and tag values 640 } 641 642 shutdown_notified_ = true; 643 shutdown_cv_.notify_all(); 644 } 645 } 646 647 void Server::Wait() { 648 std::unique_lock<std::mutex> lock(mu_); 649 while (started_ && !shutdown_notified_) { 650 shutdown_cv_.wait(lock); 651 } 652 } 653 654 void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, 655 internal::Call* call) { 656 static const size_t MAX_OPS = 8; 657 size_t nops = 0; 658 grpc_op cops[MAX_OPS]; 659 ops->FillOps(call->call(), cops, &nops); 660 // TODO(vjpai): Use ops->cq_tag once this case supports callbacks 661 auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); 662 if (result != GRPC_CALL_OK) { 663 gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result); 664 grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR, 665 call->call(), cops, nops, ops); 666 abort(); 667 } 668 } 669 670 ServerInterface::BaseAsyncRequest::BaseAsyncRequest( 671 ServerInterface* server, ServerContext* context, 672 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, 673 void* tag, bool delete_on_finalize) 674 : server_(server), 675 context_(context), 676 stream_(stream), 677 call_cq_(call_cq), 678 tag_(tag), 679 delete_on_finalize_(delete_on_finalize), 680 call_(nullptr) { 681 call_cq_->RegisterAvalanching(); // This op will trigger more ops 682 } 683 684 ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { 685 call_cq_->CompleteAvalanching(); 686 } 687 688 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, 689 bool* status) { 690 context_->set_call(call_); 691 context_->cq_ = call_cq_; 692 internal::Call call(call_, server_, call_cq_, 693 server_->max_receive_message_size()); 694 if (*status && call_) { 695 context_->BeginCompletionOp(&call); 696 } 697 // just the pointers inside call are copied here 698 stream_->BindCall(&call); 699 *tag = tag_; 700 if (delete_on_finalize_) { 701 delete this; 702 } 703 return true; 704 } 705 706 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( 707 ServerInterface* server, ServerContext* context, 708 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, 709 void* tag) 710 : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} 711 712 void ServerInterface::RegisteredAsyncRequest::IssueRequest( 713 void* registered_method, grpc_byte_buffer** payload, 714 ServerCompletionQueue* notification_cq) { 715 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( 716 server_->server(), registered_method, &call_, 717 &context_->deadline_, 718 context_->client_metadata_.arr(), payload, 719 call_cq_->cq(), notification_cq->cq(), this)); 720 } 721 722 ServerInterface::GenericAsyncRequest::GenericAsyncRequest( 723 ServerInterface* server, GenericServerContext* context, 724 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, 725 ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) 726 : BaseAsyncRequest(server, context, stream, call_cq, tag, 727 delete_on_finalize) { 728 grpc_call_details_init(&call_details_); 729 GPR_ASSERT(notification_cq); 730 GPR_ASSERT(call_cq); 731 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( 732 server->server(), &call_, &call_details_, 733 context->client_metadata_.arr(), call_cq->cq(), 734 notification_cq->cq(), this)); 735 } 736 737 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, 738 bool* status) { 739 // TODO(yangg) remove the copy here. 740 if (*status) { 741 static_cast<GenericServerContext*>(context_)->method_ = 742 StringFromCopiedSlice(call_details_.method); 743 static_cast<GenericServerContext*>(context_)->host_ = 744 StringFromCopiedSlice(call_details_.host); 745 context_->deadline_ = call_details_.deadline; 746 } 747 grpc_slice_unref(call_details_.method); 748 grpc_slice_unref(call_details_.host); 749 return BaseAsyncRequest::FinalizeResult(tag, status); 750 } 751 752 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, 753 bool* status) { 754 if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { 755 new UnimplementedAsyncRequest(server_, cq_); 756 new UnimplementedAsyncResponse(this); 757 } else { 758 delete this; 759 } 760 return false; 761 } 762 763 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( 764 UnimplementedAsyncRequest* request) 765 : request_(request) { 766 Status status(StatusCode::UNIMPLEMENTED, ""); 767 internal::UnknownMethodHandler::FillOps(request_->context(), this); 768 request_->stream()->call_.PerformOps(this); 769 } 770 771 ServerInitializer* Server::initializer() { return server_initializer_.get(); } 772 773 } // namespace grpc 774