Home | History | Annotate | Download | only in server
      1 /*
      2  * Copyright 2015 gRPC authors.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  *
     16  */
     17 
     18 #include <grpcpp/server.h>
     19 
     20 #include <cstdlib>
     21 #include <sstream>
     22 #include <utility>
     23 
     24 #include <grpc/grpc.h>
     25 #include <grpc/support/alloc.h>
     26 #include <grpc/support/log.h>
     27 #include <grpcpp/completion_queue.h>
     28 #include <grpcpp/generic/async_generic_service.h>
     29 #include <grpcpp/impl/codegen/async_unary_call.h>
     30 #include <grpcpp/impl/codegen/completion_queue_tag.h>
     31 #include <grpcpp/impl/grpc_library.h>
     32 #include <grpcpp/impl/method_handler_impl.h>
     33 #include <grpcpp/impl/rpc_service_method.h>
     34 #include <grpcpp/impl/server_initializer.h>
     35 #include <grpcpp/impl/service_type.h>
     36 #include <grpcpp/security/server_credentials.h>
     37 #include <grpcpp/server_context.h>
     38 #include <grpcpp/support/time.h>
     39 
     40 #include "src/core/ext/transport/inproc/inproc_transport.h"
     41 #include "src/core/lib/profiling/timers.h"
     42 #include "src/core/lib/surface/call.h"
     43 #include "src/cpp/client/create_channel_internal.h"
     44 #include "src/cpp/server/health/default_health_check_service.h"
     45 #include "src/cpp/thread_manager/thread_manager.h"
     46 
     47 namespace grpc {
     48 namespace {
     49 
     50 // The default value for maximum number of threads that can be created in the
     51 // sync server. This value of INT_MAX is chosen to match the default behavior if
     52 // no ResourceQuota is set. To modify the max number of threads in a sync
     53 // server, pass a custom ResourceQuota object  (with the desired number of
     54 // max-threads set) to the server builder.
     55 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
     56 
     57 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
     58  public:
     59   ~DefaultGlobalCallbacks() override {}
     60   void PreSynchronousRequest(ServerContext* context) override {}
     61   void PostSynchronousRequest(ServerContext* context) override {}
     62 };
     63 
     64 std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
     65 gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
     66 
     67 void InitGlobalCallbacks() {
     68   if (!g_callbacks) {
     69     g_callbacks.reset(new DefaultGlobalCallbacks());
     70   }
     71 }
     72 
     73 class ShutdownTag : public internal::CompletionQueueTag {
     74  public:
     75   bool FinalizeResult(void** tag, bool* status) { return false; }
     76 };
     77 
     78 class DummyTag : public internal::CompletionQueueTag {
     79  public:
     80   bool FinalizeResult(void** tag, bool* status) {
     81     *status = true;
     82     return true;
     83   }
     84 };
     85 
     86 class UnimplementedAsyncRequestContext {
     87  protected:
     88   UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
     89 
     90   GenericServerContext server_context_;
     91   GenericServerAsyncReaderWriter generic_stream_;
     92 };
     93 
     94 }  // namespace
     95 
     96 /// Use private inheritance rather than composition only to establish order
     97 /// of construction, since the public base class should be constructed after the
     98 /// elements belonging to the private base class are constructed. This is not
     99 /// possible using true composition.
    100 class Server::UnimplementedAsyncRequest final
    101     : private UnimplementedAsyncRequestContext,
    102       public GenericAsyncRequest {
    103  public:
    104   UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
    105       : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
    106                             nullptr, false),
    107         server_(server),
    108         cq_(cq) {}
    109 
    110   bool FinalizeResult(void** tag, bool* status) override;
    111 
    112   ServerContext* context() { return &server_context_; }
    113   GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
    114 
    115  private:
    116   Server* const server_;
    117   ServerCompletionQueue* const cq_;
    118 };
    119 
    120 /// UnimplementedAsyncResponse should not post user-visible completions to the
    121 /// C++ completion queue, but is generated as a CQ event by the core
    122 class Server::UnimplementedAsyncResponse final
    123     : public internal::CallOpSet<internal::CallOpSendInitialMetadata,
    124                                  internal::CallOpServerSendStatus> {
    125  public:
    126   UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
    127   ~UnimplementedAsyncResponse() { delete request_; }
    128 
    129   bool FinalizeResult(void** tag, bool* status) override {
    130     internal::CallOpSet<
    131         internal::CallOpSendInitialMetadata,
    132         internal::CallOpServerSendStatus>::FinalizeResult(tag, status);
    133     delete this;
    134     return false;
    135   }
    136 
    137  private:
    138   UnimplementedAsyncRequest* const request_;
    139 };
    140 
    141 class Server::SyncRequest final : public internal::CompletionQueueTag {
    142  public:
    143   SyncRequest(internal::RpcServiceMethod* method, void* tag)
    144       : method_(method),
    145         tag_(tag),
    146         in_flight_(false),
    147         has_request_payload_(
    148             method->method_type() == internal::RpcMethod::NORMAL_RPC ||
    149             method->method_type() == internal::RpcMethod::SERVER_STREAMING),
    150         call_details_(nullptr),
    151         cq_(nullptr) {
    152     grpc_metadata_array_init(&request_metadata_);
    153   }
    154 
    155   ~SyncRequest() {
    156     if (call_details_) {
    157       delete call_details_;
    158     }
    159     grpc_metadata_array_destroy(&request_metadata_);
    160   }
    161 
    162   void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
    163 
    164   void TeardownRequest() {
    165     grpc_completion_queue_destroy(cq_);
    166     cq_ = nullptr;
    167   }
    168 
    169   void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
    170     GPR_ASSERT(cq_ && !in_flight_);
    171     in_flight_ = true;
    172     if (tag_) {
    173       if (GRPC_CALL_OK !=
    174           grpc_server_request_registered_call(
    175               server, tag_, &call_, &deadline_, &request_metadata_,
    176               has_request_payload_ ? &request_payload_ : nullptr, cq_,
    177               notify_cq, this)) {
    178         TeardownRequest();
    179         return;
    180       }
    181     } else {
    182       if (!call_details_) {
    183         call_details_ = new grpc_call_details;
    184         grpc_call_details_init(call_details_);
    185       }
    186       if (grpc_server_request_call(server, &call_, call_details_,
    187                                    &request_metadata_, cq_, notify_cq,
    188                                    this) != GRPC_CALL_OK) {
    189         TeardownRequest();
    190         return;
    191       }
    192     }
    193   }
    194 
    195   bool FinalizeResult(void** tag, bool* status) override {
    196     if (!*status) {
    197       grpc_completion_queue_destroy(cq_);
    198     }
    199     if (call_details_) {
    200       deadline_ = call_details_->deadline;
    201       grpc_call_details_destroy(call_details_);
    202       grpc_call_details_init(call_details_);
    203     }
    204     return true;
    205   }
    206 
    207   class CallData final {
    208    public:
    209     explicit CallData(Server* server, SyncRequest* mrd)
    210         : cq_(mrd->cq_),
    211           call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
    212           ctx_(mrd->deadline_, &mrd->request_metadata_),
    213           has_request_payload_(mrd->has_request_payload_),
    214           request_payload_(has_request_payload_ ? mrd->request_payload_
    215                                                 : nullptr),
    216           method_(mrd->method_),
    217           server_(server) {
    218       ctx_.set_call(mrd->call_);
    219       ctx_.cq_ = &cq_;
    220       GPR_ASSERT(mrd->in_flight_);
    221       mrd->in_flight_ = false;
    222       mrd->request_metadata_.count = 0;
    223     }
    224 
    225     ~CallData() {
    226       if (has_request_payload_ && request_payload_) {
    227         grpc_byte_buffer_destroy(request_payload_);
    228       }
    229     }
    230 
    231     void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
    232              bool resources) {
    233       ctx_.BeginCompletionOp(&call_);
    234       global_callbacks->PreSynchronousRequest(&ctx_);
    235       auto* handler = resources ? method_->handler()
    236                                 : server_->resource_exhausted_handler_.get();
    237       handler->RunHandler(internal::MethodHandler::HandlerParameter(
    238           &call_, &ctx_, request_payload_));
    239       global_callbacks->PostSynchronousRequest(&ctx_);
    240       request_payload_ = nullptr;
    241 
    242       cq_.Shutdown();
    243 
    244       internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
    245       cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
    246 
    247       /* Ensure the cq_ is shutdown */
    248       DummyTag ignored_tag;
    249       GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
    250     }
    251 
    252    private:
    253     CompletionQueue cq_;
    254     internal::Call call_;
    255     ServerContext ctx_;
    256     const bool has_request_payload_;
    257     grpc_byte_buffer* request_payload_;
    258     internal::RpcServiceMethod* const method_;
    259     Server* server_;
    260   };
    261 
    262  private:
    263   internal::RpcServiceMethod* const method_;
    264   void* const tag_;
    265   bool in_flight_;
    266   const bool has_request_payload_;
    267   grpc_call* call_;
    268   grpc_call_details* call_details_;
    269   gpr_timespec deadline_;
    270   grpc_metadata_array request_metadata_;
    271   grpc_byte_buffer* request_payload_;
    272   grpc_completion_queue* cq_;
    273 };
    274 
    275 // Implementation of ThreadManager. Each instance of SyncRequestThreadManager
    276 // manages a pool of threads that poll for incoming Sync RPCs and call the
    277 // appropriate RPC handlers
    278 class Server::SyncRequestThreadManager : public ThreadManager {
    279  public:
    280   SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
    281                            std::shared_ptr<GlobalCallbacks> global_callbacks,
    282                            grpc_resource_quota* rq, int min_pollers,
    283                            int max_pollers, int cq_timeout_msec)
    284       : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
    285         server_(server),
    286         server_cq_(server_cq),
    287         cq_timeout_msec_(cq_timeout_msec),
    288         global_callbacks_(std::move(global_callbacks)) {}
    289 
    290   WorkStatus PollForWork(void** tag, bool* ok) override {
    291     *tag = nullptr;
    292     // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
    293     // right now
    294     gpr_timespec deadline =
    295         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
    296                      gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
    297 
    298     switch (server_cq_->AsyncNext(tag, ok, deadline)) {
    299       case CompletionQueue::TIMEOUT:
    300         return TIMEOUT;
    301       case CompletionQueue::SHUTDOWN:
    302         return SHUTDOWN;
    303       case CompletionQueue::GOT_EVENT:
    304         return WORK_FOUND;
    305     }
    306 
    307     GPR_UNREACHABLE_CODE(return TIMEOUT);
    308   }
    309 
    310   void DoWork(void* tag, bool ok, bool resources) override {
    311     SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
    312 
    313     if (!sync_req) {
    314       // No tag. Nothing to work on. This is an unlikley scenario and possibly a
    315       // bug in RPC Manager implementation.
    316       gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
    317       return;
    318     }
    319 
    320     if (ok) {
    321       // Calldata takes ownership of the completion queue inside sync_req
    322       SyncRequest::CallData cd(server_, sync_req);
    323       // Prepare for the next request
    324       if (!IsShutdown()) {
    325         sync_req->SetupRequest();  // Create new completion queue for sync_req
    326         sync_req->Request(server_->c_server(), server_cq_->cq());
    327       }
    328 
    329       GPR_TIMER_SCOPE("cd.Run()", 0);
    330       cd.Run(global_callbacks_, resources);
    331     }
    332     // TODO (sreek) If ok is false here (which it isn't in case of
    333     // grpc_request_registered_call), we should still re-queue the request
    334     // object
    335   }
    336 
    337   void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
    338     sync_requests_.emplace_back(new SyncRequest(method, tag));
    339   }
    340 
    341   void AddUnknownSyncMethod() {
    342     if (!sync_requests_.empty()) {
    343       unknown_method_.reset(new internal::RpcServiceMethod(
    344           "unknown", internal::RpcMethod::BIDI_STREAMING,
    345           new internal::UnknownMethodHandler));
    346       sync_requests_.emplace_back(
    347           new SyncRequest(unknown_method_.get(), nullptr));
    348     }
    349   }
    350 
    351   void Shutdown() override {
    352     ThreadManager::Shutdown();
    353     server_cq_->Shutdown();
    354   }
    355 
    356   void Wait() override {
    357     ThreadManager::Wait();
    358     // Drain any pending items from the queue
    359     void* tag;
    360     bool ok;
    361     while (server_cq_->Next(&tag, &ok)) {
    362       // Do nothing
    363     }
    364   }
    365 
    366   void Start() {
    367     if (!sync_requests_.empty()) {
    368       for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) {
    369         (*m)->SetupRequest();
    370         (*m)->Request(server_->c_server(), server_cq_->cq());
    371       }
    372 
    373       Initialize();  // ThreadManager's Initialize()
    374     }
    375   }
    376 
    377  private:
    378   Server* server_;
    379   CompletionQueue* server_cq_;
    380   int cq_timeout_msec_;
    381   std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
    382   std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
    383   std::unique_ptr<internal::RpcServiceMethod> health_check_;
    384   std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
    385 };
    386 
    387 static internal::GrpcLibraryInitializer g_gli_initializer;
    388 Server::Server(
    389     int max_receive_message_size, ChannelArguments* args,
    390     std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
    391         sync_server_cqs,
    392     int min_pollers, int max_pollers, int sync_cq_timeout_msec,
    393     grpc_resource_quota* server_rq)
    394     : max_receive_message_size_(max_receive_message_size),
    395       sync_server_cqs_(std::move(sync_server_cqs)),
    396       started_(false),
    397       shutdown_(false),
    398       shutdown_notified_(false),
    399       has_generic_service_(false),
    400       server_(nullptr),
    401       server_initializer_(new ServerInitializer(this)),
    402       health_check_service_disabled_(false) {
    403   g_gli_initializer.summon();
    404   gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
    405   global_callbacks_ = g_callbacks;
    406   global_callbacks_->UpdateArguments(args);
    407 
    408   if (sync_server_cqs_ != nullptr) {
    409     bool default_rq_created = false;
    410     if (server_rq == nullptr) {
    411       server_rq = grpc_resource_quota_create("SyncServer-default-rq");
    412       grpc_resource_quota_set_max_threads(server_rq,
    413                                           DEFAULT_MAX_SYNC_SERVER_THREADS);
    414       default_rq_created = true;
    415     }
    416 
    417     for (const auto& it : *sync_server_cqs_) {
    418       sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
    419           this, it.get(), global_callbacks_, server_rq, min_pollers,
    420           max_pollers, sync_cq_timeout_msec));
    421     }
    422 
    423     if (default_rq_created) {
    424       grpc_resource_quota_unref(server_rq);
    425     }
    426   }
    427 
    428   grpc_channel_args channel_args;
    429   args->SetChannelArgs(&channel_args);
    430 
    431   for (size_t i = 0; i < channel_args.num_args; i++) {
    432     if (0 ==
    433         strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) {
    434       if (channel_args.args[i].value.pointer.p == nullptr) {
    435         health_check_service_disabled_ = true;
    436       } else {
    437         health_check_service_.reset(static_cast<HealthCheckServiceInterface*>(
    438             channel_args.args[i].value.pointer.p));
    439       }
    440       break;
    441     }
    442   }
    443 
    444   server_ = grpc_server_create(&channel_args, nullptr);
    445 }
    446 
    447 Server::~Server() {
    448   {
    449     std::unique_lock<std::mutex> lock(mu_);
    450     if (started_ && !shutdown_) {
    451       lock.unlock();
    452       Shutdown();
    453     } else if (!started_) {
    454       // Shutdown the completion queues
    455       for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
    456         (*it)->Shutdown();
    457       }
    458     }
    459   }
    460 
    461   grpc_server_destroy(server_);
    462 }
    463 
    464 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
    465   GPR_ASSERT(!g_callbacks);
    466   GPR_ASSERT(callbacks);
    467   g_callbacks.reset(callbacks);
    468 }
    469 
    470 grpc_server* Server::c_server() { return server_; }
    471 
    472 std::shared_ptr<Channel> Server::InProcessChannel(
    473     const ChannelArguments& args) {
    474   grpc_channel_args channel_args = args.c_channel_args();
    475   return CreateChannelInternal(
    476       "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr));
    477 }
    478 
    479 static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
    480     internal::RpcServiceMethod* method) {
    481   switch (method->method_type()) {
    482     case internal::RpcMethod::NORMAL_RPC:
    483     case internal::RpcMethod::SERVER_STREAMING:
    484       return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
    485     case internal::RpcMethod::CLIENT_STREAMING:
    486     case internal::RpcMethod::BIDI_STREAMING:
    487       return GRPC_SRM_PAYLOAD_NONE;
    488   }
    489   GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
    490 }
    491 
    492 bool Server::RegisterService(const grpc::string* host, Service* service) {
    493   bool has_async_methods = service->has_async_methods();
    494   if (has_async_methods) {
    495     GPR_ASSERT(service->server_ == nullptr &&
    496                "Can only register an asynchronous service against one server.");
    497     service->server_ = this;
    498   }
    499 
    500   const char* method_name = nullptr;
    501   for (auto it = service->methods_.begin(); it != service->methods_.end();
    502        ++it) {
    503     if (it->get() == nullptr) {  // Handled by generic service if any.
    504       continue;
    505     }
    506 
    507     internal::RpcServiceMethod* method = it->get();
    508     void* tag = grpc_server_register_method(
    509         server_, method->name(), host ? host->c_str() : nullptr,
    510         PayloadHandlingForMethod(method), 0);
    511     if (tag == nullptr) {
    512       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
    513               method->name());
    514       return false;
    515     }
    516 
    517     if (method->handler() == nullptr) {  // Async method
    518       method->set_server_tag(tag);
    519     } else {
    520       for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
    521         (*it)->AddSyncMethod(method, tag);
    522       }
    523     }
    524 
    525     method_name = method->name();
    526   }
    527 
    528   // Parse service name.
    529   if (method_name != nullptr) {
    530     std::stringstream ss(method_name);
    531     grpc::string service_name;
    532     if (std::getline(ss, service_name, '/') &&
    533         std::getline(ss, service_name, '/')) {
    534       services_.push_back(service_name);
    535     }
    536   }
    537   return true;
    538 }
    539 
    540 void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
    541   GPR_ASSERT(service->server_ == nullptr &&
    542              "Can only register an async generic service against one server.");
    543   service->server_ = this;
    544   has_generic_service_ = true;
    545 }
    546 
    547 int Server::AddListeningPort(const grpc::string& addr,
    548                              ServerCredentials* creds) {
    549   GPR_ASSERT(!started_);
    550   int port = creds->AddPortToServer(addr, server_);
    551   global_callbacks_->AddPort(this, addr, creds, port);
    552   return port;
    553 }
    554 
    555 void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
    556   GPR_ASSERT(!started_);
    557   global_callbacks_->PreServerStart(this);
    558   started_ = true;
    559 
    560   // Only create default health check service when user did not provide an
    561   // explicit one.
    562   if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
    563       DefaultHealthCheckServiceEnabled()) {
    564     if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
    565       gpr_log(GPR_INFO,
    566               "Default health check service disabled at async-only server.");
    567     } else {
    568       auto* default_hc_service = new DefaultHealthCheckService;
    569       health_check_service_.reset(default_hc_service);
    570       RegisterService(nullptr, default_hc_service->GetHealthCheckService());
    571     }
    572   }
    573 
    574   grpc_server_start(server_);
    575 
    576   if (!has_generic_service_) {
    577     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
    578       (*it)->AddUnknownSyncMethod();
    579     }
    580 
    581     for (size_t i = 0; i < num_cqs; i++) {
    582       if (cqs[i]->IsFrequentlyPolled()) {
    583         new UnimplementedAsyncRequest(this, cqs[i]);
    584       }
    585     }
    586   }
    587 
    588   // If this server has any support for synchronous methods (has any sync
    589   // server CQs), make sure that we have a ResourceExhausted handler
    590   // to deal with the case of thread exhaustion
    591   if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
    592     resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
    593   }
    594 
    595   for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
    596     (*it)->Start();
    597   }
    598 }
    599 
    600 void Server::ShutdownInternal(gpr_timespec deadline) {
    601   std::unique_lock<std::mutex> lock(mu_);
    602   if (!shutdown_) {
    603     shutdown_ = true;
    604 
    605     /// The completion queue to use for server shutdown completion notification
    606     CompletionQueue shutdown_cq;
    607     ShutdownTag shutdown_tag;  // Dummy shutdown tag
    608     grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
    609 
    610     shutdown_cq.Shutdown();
    611 
    612     void* tag;
    613     bool ok;
    614     CompletionQueue::NextStatus status =
    615         shutdown_cq.AsyncNext(&tag, &ok, deadline);
    616 
    617     // If this timed out, it means we are done with the grace period for a clean
    618     // shutdown. We should force a shutdown now by cancelling all inflight calls
    619     if (status == CompletionQueue::NextStatus::TIMEOUT) {
    620       grpc_server_cancel_all_calls(server_);
    621     }
    622     // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
    623     // successfully shutdown
    624 
    625     // Shutdown all ThreadManagers. This will try to gracefully stop all the
    626     // threads in the ThreadManagers (once they process any inflight requests)
    627     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
    628       (*it)->Shutdown();  // ThreadManager's Shutdown()
    629     }
    630 
    631     // Wait for threads in all ThreadManagers to terminate
    632     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
    633       (*it)->Wait();
    634     }
    635 
    636     // Drain the shutdown queue (if the previous call to AsyncNext() timed out
    637     // and we didn't remove the tag from the queue yet)
    638     while (shutdown_cq.Next(&tag, &ok)) {
    639       // Nothing to be done here. Just ignore ok and tag values
    640     }
    641 
    642     shutdown_notified_ = true;
    643     shutdown_cv_.notify_all();
    644   }
    645 }
    646 
    647 void Server::Wait() {
    648   std::unique_lock<std::mutex> lock(mu_);
    649   while (started_ && !shutdown_notified_) {
    650     shutdown_cv_.wait(lock);
    651   }
    652 }
    653 
    654 void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
    655                               internal::Call* call) {
    656   static const size_t MAX_OPS = 8;
    657   size_t nops = 0;
    658   grpc_op cops[MAX_OPS];
    659   ops->FillOps(call->call(), cops, &nops);
    660   // TODO(vjpai): Use ops->cq_tag once this case supports callbacks
    661   auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
    662   if (result != GRPC_CALL_OK) {
    663     gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
    664     grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
    665                         call->call(), cops, nops, ops);
    666     abort();
    667   }
    668 }
    669 
    670 ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
    671     ServerInterface* server, ServerContext* context,
    672     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
    673     void* tag, bool delete_on_finalize)
    674     : server_(server),
    675       context_(context),
    676       stream_(stream),
    677       call_cq_(call_cq),
    678       tag_(tag),
    679       delete_on_finalize_(delete_on_finalize),
    680       call_(nullptr) {
    681   call_cq_->RegisterAvalanching();  // This op will trigger more ops
    682 }
    683 
    684 ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
    685   call_cq_->CompleteAvalanching();
    686 }
    687 
    688 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
    689                                                        bool* status) {
    690   context_->set_call(call_);
    691   context_->cq_ = call_cq_;
    692   internal::Call call(call_, server_, call_cq_,
    693                       server_->max_receive_message_size());
    694   if (*status && call_) {
    695     context_->BeginCompletionOp(&call);
    696   }
    697   // just the pointers inside call are copied here
    698   stream_->BindCall(&call);
    699   *tag = tag_;
    700   if (delete_on_finalize_) {
    701     delete this;
    702   }
    703   return true;
    704 }
    705 
    706 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
    707     ServerInterface* server, ServerContext* context,
    708     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
    709     void* tag)
    710     : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
    711 
    712 void ServerInterface::RegisteredAsyncRequest::IssueRequest(
    713     void* registered_method, grpc_byte_buffer** payload,
    714     ServerCompletionQueue* notification_cq) {
    715   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
    716                                  server_->server(), registered_method, &call_,
    717                                  &context_->deadline_,
    718                                  context_->client_metadata_.arr(), payload,
    719                                  call_cq_->cq(), notification_cq->cq(), this));
    720 }
    721 
    722 ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
    723     ServerInterface* server, GenericServerContext* context,
    724     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
    725     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
    726     : BaseAsyncRequest(server, context, stream, call_cq, tag,
    727                        delete_on_finalize) {
    728   grpc_call_details_init(&call_details_);
    729   GPR_ASSERT(notification_cq);
    730   GPR_ASSERT(call_cq);
    731   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
    732                                  server->server(), &call_, &call_details_,
    733                                  context->client_metadata_.arr(), call_cq->cq(),
    734                                  notification_cq->cq(), this));
    735 }
    736 
    737 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
    738                                                           bool* status) {
    739   // TODO(yangg) remove the copy here.
    740   if (*status) {
    741     static_cast<GenericServerContext*>(context_)->method_ =
    742         StringFromCopiedSlice(call_details_.method);
    743     static_cast<GenericServerContext*>(context_)->host_ =
    744         StringFromCopiedSlice(call_details_.host);
    745     context_->deadline_ = call_details_.deadline;
    746   }
    747   grpc_slice_unref(call_details_.method);
    748   grpc_slice_unref(call_details_.host);
    749   return BaseAsyncRequest::FinalizeResult(tag, status);
    750 }
    751 
    752 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
    753                                                        bool* status) {
    754   if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
    755     new UnimplementedAsyncRequest(server_, cq_);
    756     new UnimplementedAsyncResponse(this);
    757   } else {
    758     delete this;
    759   }
    760   return false;
    761 }
    762 
    763 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
    764     UnimplementedAsyncRequest* request)
    765     : request_(request) {
    766   Status status(StatusCode::UNIMPLEMENTED, "");
    767   internal::UnknownMethodHandler::FillOps(request_->context(), this);
    768   request_->stream()->call_.PerformOps(this);
    769 }
    770 
    771 ServerInitializer* Server::initializer() { return server_initializer_.get(); }
    772 
    773 }  // namespace grpc
    774