Home | History | Annotate | Download | only in codegen
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
     20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
     21 
     22 #include <grpc/impl/codegen/grpc_types.h>
     23 #include <grpcpp/impl/codegen/byte_buffer.h>
     24 #include <grpcpp/impl/codegen/call_hook.h>
     25 #include <grpcpp/impl/codegen/completion_queue_tag.h>
     26 #include <grpcpp/impl/codegen/core_codegen_interface.h>
     27 #include <grpcpp/impl/codegen/rpc_service_method.h>
     28 
     29 namespace grpc {
     30 
     31 class AsyncGenericService;
     32 class Channel;
     33 class GenericServerContext;
     34 class ServerCompletionQueue;
     35 class ServerContext;
     36 class ServerCredentials;
     37 class Service;
     38 
     39 extern CoreCodegenInterface* g_core_codegen_interface;
     40 
     41 /// Models a gRPC server.
     42 ///
     43 /// Servers are configured and started via \a grpc::ServerBuilder.
     44 namespace internal {
     45 class ServerAsyncStreamingInterface;
     46 }  // namespace internal
     47 
     48 class ServerInterface : public internal::CallHook {
     49  public:
     50   virtual ~ServerInterface() {}
     51 
     52   /// \a Shutdown does the following things:
     53   ///
     54   /// 1. Shutdown the server: deactivate all listening ports, mark it in
     55   ///    "shutdown mode" so that further call Request's or incoming RPC matches
     56   ///    are no longer allowed. Also return all Request'ed-but-not-yet-active
     57   ///    calls as failed (!ok). This refers to calls that have been requested
     58   ///    at the server by the server-side library or application code but that
     59   ///    have not yet been matched to incoming RPCs from the client. Note that
     60   ///    this would even include default calls added automatically by the gRPC
     61   ///    C++ API without the user's input (e.g., "Unimplemented RPC method")
     62   ///
     63   /// 2. Block until all rpc method handlers invoked automatically by the sync
     64   ///    API finish.
     65   ///
     66   /// 3. If all pending calls complete (and all their operations are
     67   ///    retrieved by Next) before \a deadline expires, this finishes
     68   ///    gracefully. Otherwise, forcefully cancel all pending calls associated
     69   ///    with the server after \a deadline expires. In the case of the sync API,
     70   ///    if the RPC function for a streaming call has already been started and
     71   ///    takes a week to complete, the RPC function won't be forcefully
     72   ///    terminated (since that would leave state corrupt and incomplete) and
     73   ///    the method handler will just keep running (which will prevent the
     74   ///    server from completing the "join" operation that it needs to do at
     75   ///    shutdown time).
     76   ///
     77   /// All completion queue associated with the server (for example, for async
     78   /// serving) must be shutdown *after* this method has returned:
     79   /// See \a ServerBuilder::AddCompletionQueue for details.
     80   /// They must also be drained (by repeated Next) after being shutdown.
     81   ///
     82   /// \param deadline How long to wait until pending rpcs are forcefully
     83   /// terminated.
     84   template <class T>
     85   void Shutdown(const T& deadline) {
     86     ShutdownInternal(TimePoint<T>(deadline).raw_time());
     87   }
     88 
     89   /// Shutdown the server without a deadline and forced cancellation.
     90   ///
     91   /// All completion queue associated with the server (for example, for async
     92   /// serving) must be shutdown *after* this method has returned:
     93   /// See \a ServerBuilder::AddCompletionQueue for details.
     94   void Shutdown() {
     95     ShutdownInternal(
     96         g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC));
     97   }
     98 
     99   /// Block waiting for all work to complete.
    100   ///
    101   /// \warning The server must be either shutting down or some other thread must
    102   /// call \a Shutdown for this function to ever return.
    103   virtual void Wait() = 0;
    104 
    105  protected:
    106   friend class ::grpc::Service;
    107 
    108   /// Register a service. This call does not take ownership of the service.
    109   /// The service must exist for the lifetime of the Server instance.
    110   virtual bool RegisterService(const grpc::string* host, Service* service) = 0;
    111 
    112   /// Register a generic service. This call does not take ownership of the
    113   /// service. The service must exist for the lifetime of the Server instance.
    114   virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
    115 
    116   /// Tries to bind \a server to the given \a addr.
    117   ///
    118   /// It can be invoked multiple times.
    119   ///
    120   /// \param addr The address to try to bind to the server (eg, localhost:1234,
    121   /// 192.168.1.1:31416, [::1]:27182, etc.).
    122   /// \params creds The credentials associated with the server.
    123   ///
    124   /// \return bound port number on sucess, 0 on failure.
    125   ///
    126   /// \warning It's an error to call this method on an already started server.
    127   virtual int AddListeningPort(const grpc::string& addr,
    128                                ServerCredentials* creds) = 0;
    129 
    130   /// Start the server.
    131   ///
    132   /// \param cqs Completion queues for handling asynchronous services. The
    133   /// caller is required to keep all completion queues live until the server is
    134   /// destroyed.
    135   /// \param num_cqs How many completion queues does \a cqs hold.
    136   virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
    137 
    138   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
    139 
    140   virtual int max_receive_message_size() const = 0;
    141 
    142   virtual grpc_server* server() = 0;
    143 
    144   virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
    145                                 internal::Call* call) = 0;
    146 
    147   class BaseAsyncRequest : public internal::CompletionQueueTag {
    148    public:
    149     BaseAsyncRequest(ServerInterface* server, ServerContext* context,
    150                      internal::ServerAsyncStreamingInterface* stream,
    151                      CompletionQueue* call_cq, void* tag,
    152                      bool delete_on_finalize);
    153     virtual ~BaseAsyncRequest();
    154 
    155     bool FinalizeResult(void** tag, bool* status) override;
    156 
    157    protected:
    158     ServerInterface* const server_;
    159     ServerContext* const context_;
    160     internal::ServerAsyncStreamingInterface* const stream_;
    161     CompletionQueue* const call_cq_;
    162     void* const tag_;
    163     const bool delete_on_finalize_;
    164     grpc_call* call_;
    165   };
    166 
    167   class RegisteredAsyncRequest : public BaseAsyncRequest {
    168    public:
    169     RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
    170                            internal::ServerAsyncStreamingInterface* stream,
    171                            CompletionQueue* call_cq, void* tag);
    172 
    173     // uses BaseAsyncRequest::FinalizeResult
    174 
    175    protected:
    176     void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
    177                       ServerCompletionQueue* notification_cq);
    178   };
    179 
    180   class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
    181    public:
    182     NoPayloadAsyncRequest(void* registered_method, ServerInterface* server,
    183                           ServerContext* context,
    184                           internal::ServerAsyncStreamingInterface* stream,
    185                           CompletionQueue* call_cq,
    186                           ServerCompletionQueue* notification_cq, void* tag)
    187         : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
    188       IssueRequest(registered_method, nullptr, notification_cq);
    189     }
    190 
    191     // uses RegisteredAsyncRequest::FinalizeResult
    192   };
    193 
    194   template <class Message>
    195   class PayloadAsyncRequest final : public RegisteredAsyncRequest {
    196    public:
    197     PayloadAsyncRequest(void* registered_method, ServerInterface* server,
    198                         ServerContext* context,
    199                         internal::ServerAsyncStreamingInterface* stream,
    200                         CompletionQueue* call_cq,
    201                         ServerCompletionQueue* notification_cq, void* tag,
    202                         Message* request)
    203         : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
    204           registered_method_(registered_method),
    205           server_(server),
    206           context_(context),
    207           stream_(stream),
    208           call_cq_(call_cq),
    209           notification_cq_(notification_cq),
    210           tag_(tag),
    211           request_(request) {
    212       IssueRequest(registered_method, payload_.bbuf_ptr(), notification_cq);
    213     }
    214 
    215     ~PayloadAsyncRequest() {
    216       payload_.Release();  // We do not own the payload_
    217     }
    218 
    219     bool FinalizeResult(void** tag, bool* status) override {
    220       if (*status) {
    221         if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
    222                                       payload_.bbuf_ptr(), request_)
    223                                       .ok()) {
    224           // If deserialization fails, we cancel the call and instantiate
    225           // a new instance of ourselves to request another call.  We then
    226           // return false, which prevents the call from being returned to
    227           // the application.
    228           g_core_codegen_interface->grpc_call_cancel_with_status(
    229               call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
    230           g_core_codegen_interface->grpc_call_unref(call_);
    231           new PayloadAsyncRequest(registered_method_, server_, context_,
    232                                   stream_, call_cq_, notification_cq_, tag_,
    233                                   request_);
    234           delete this;
    235           return false;
    236         }
    237       }
    238       return RegisteredAsyncRequest::FinalizeResult(tag, status);
    239     }
    240 
    241    private:
    242     void* const registered_method_;
    243     ServerInterface* const server_;
    244     ServerContext* const context_;
    245     internal::ServerAsyncStreamingInterface* const stream_;
    246     CompletionQueue* const call_cq_;
    247     ServerCompletionQueue* const notification_cq_;
    248     void* const tag_;
    249     Message* const request_;
    250     ByteBuffer payload_;
    251   };
    252 
    253   class GenericAsyncRequest : public BaseAsyncRequest {
    254    public:
    255     GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
    256                         internal::ServerAsyncStreamingInterface* stream,
    257                         CompletionQueue* call_cq,
    258                         ServerCompletionQueue* notification_cq, void* tag,
    259                         bool delete_on_finalize);
    260 
    261     bool FinalizeResult(void** tag, bool* status) override;
    262 
    263    private:
    264     grpc_call_details call_details_;
    265   };
    266 
    267   template <class Message>
    268   void RequestAsyncCall(internal::RpcServiceMethod* method,
    269                         ServerContext* context,
    270                         internal::ServerAsyncStreamingInterface* stream,
    271                         CompletionQueue* call_cq,
    272                         ServerCompletionQueue* notification_cq, void* tag,
    273                         Message* message) {
    274     GPR_CODEGEN_ASSERT(method);
    275     new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
    276                                      stream, call_cq, notification_cq, tag,
    277                                      message);
    278   }
    279 
    280   void RequestAsyncCall(internal::RpcServiceMethod* method,
    281                         ServerContext* context,
    282                         internal::ServerAsyncStreamingInterface* stream,
    283                         CompletionQueue* call_cq,
    284                         ServerCompletionQueue* notification_cq, void* tag) {
    285     GPR_CODEGEN_ASSERT(method);
    286     new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
    287                               call_cq, notification_cq, tag);
    288   }
    289 
    290   void RequestAsyncGenericCall(GenericServerContext* context,
    291                                internal::ServerAsyncStreamingInterface* stream,
    292                                CompletionQueue* call_cq,
    293                                ServerCompletionQueue* notification_cq,
    294                                void* tag) {
    295     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
    296                             tag, true);
    297   }
    298 };
    299 
    300 }  // namespace grpc
    301 
    302 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
    303