Home | History | Annotate | Download | only in codegen
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
     20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
     21 
     22 #include <grpcpp/impl/codegen/byte_buffer.h>
     23 #include <grpcpp/impl/codegen/core_codegen_interface.h>
     24 #include <grpcpp/impl/codegen/rpc_service_method.h>
     25 #include <grpcpp/impl/codegen/sync_stream.h>
     26 
     27 namespace grpc {
     28 
     29 namespace internal {
     30 
     31 // Invoke the method handler, fill in the status, and
     32 // return whether or not we finished safely (without an exception).
     33 // Note that exception handling is 0-cost in most compiler/library
     34 // implementations (except when an exception is actually thrown),
     35 // so this process doesn't require additional overhead in the common case.
     36 // Additionally, we don't need to return if we caught an exception or not;
     37 // the handling is the same in either case.
     38 template <class Callable>
     39 Status CatchingFunctionHandler(Callable&& handler) {
     40 #if GRPC_ALLOW_EXCEPTIONS
     41   try {
     42     return handler();
     43   } catch (...) {
     44     return Status(StatusCode::UNKNOWN, "Unexpected error in RPC handling");
     45   }
     46 #else   // GRPC_ALLOW_EXCEPTIONS
     47   return handler();
     48 #endif  // GRPC_ALLOW_EXCEPTIONS
     49 }
     50 
     51 /// A wrapper class of an application provided rpc method handler.
     52 template <class ServiceType, class RequestType, class ResponseType>
     53 class RpcMethodHandler : public MethodHandler {
     54  public:
     55   RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*,
     56                                         const RequestType*, ResponseType*)>
     57                        func,
     58                    ServiceType* service)
     59       : func_(func), service_(service) {}
     60 
     61   void RunHandler(const HandlerParameter& param) final {
     62     RequestType req;
     63     Status status = SerializationTraits<RequestType>::Deserialize(
     64         param.request.bbuf_ptr(), &req);
     65     ResponseType rsp;
     66     if (status.ok()) {
     67       status = CatchingFunctionHandler([this, &param, &req, &rsp] {
     68         return func_(service_, param.server_context, &req, &rsp);
     69       });
     70     }
     71 
     72     GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
     73     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
     74               CallOpServerSendStatus>
     75         ops;
     76     ops.SendInitialMetadata(param.server_context->initial_metadata_,
     77                             param.server_context->initial_metadata_flags());
     78     if (param.server_context->compression_level_set()) {
     79       ops.set_compression_level(param.server_context->compression_level());
     80     }
     81     if (status.ok()) {
     82       status = ops.SendMessage(rsp);
     83     }
     84     ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
     85     param.call->PerformOps(&ops);
     86     param.call->cq()->Pluck(&ops);
     87   }
     88 
     89  private:
     90   /// Application provided rpc handler function.
     91   std::function<Status(ServiceType*, ServerContext*, const RequestType*,
     92                        ResponseType*)>
     93       func_;
     94   // The class the above handler function lives in.
     95   ServiceType* service_;
     96 };
     97 
     98 /// A wrapper class of an application provided client streaming handler.
     99 template <class ServiceType, class RequestType, class ResponseType>
    100 class ClientStreamingHandler : public MethodHandler {
    101  public:
    102   ClientStreamingHandler(
    103       std::function<Status(ServiceType*, ServerContext*,
    104                            ServerReader<RequestType>*, ResponseType*)>
    105           func,
    106       ServiceType* service)
    107       : func_(func), service_(service) {}
    108 
    109   void RunHandler(const HandlerParameter& param) final {
    110     ServerReader<RequestType> reader(param.call, param.server_context);
    111     ResponseType rsp;
    112     Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
    113       return func_(service_, param.server_context, &reader, &rsp);
    114     });
    115 
    116     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
    117               CallOpServerSendStatus>
    118         ops;
    119     if (!param.server_context->sent_initial_metadata_) {
    120       ops.SendInitialMetadata(param.server_context->initial_metadata_,
    121                               param.server_context->initial_metadata_flags());
    122       if (param.server_context->compression_level_set()) {
    123         ops.set_compression_level(param.server_context->compression_level());
    124       }
    125     }
    126     if (status.ok()) {
    127       status = ops.SendMessage(rsp);
    128     }
    129     ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
    130     param.call->PerformOps(&ops);
    131     param.call->cq()->Pluck(&ops);
    132   }
    133 
    134  private:
    135   std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
    136                        ResponseType*)>
    137       func_;
    138   ServiceType* service_;
    139 };
    140 
    141 /// A wrapper class of an application provided server streaming handler.
    142 template <class ServiceType, class RequestType, class ResponseType>
    143 class ServerStreamingHandler : public MethodHandler {
    144  public:
    145   ServerStreamingHandler(
    146       std::function<Status(ServiceType*, ServerContext*, const RequestType*,
    147                            ServerWriter<ResponseType>*)>
    148           func,
    149       ServiceType* service)
    150       : func_(func), service_(service) {}
    151 
    152   void RunHandler(const HandlerParameter& param) final {
    153     RequestType req;
    154     Status status = SerializationTraits<RequestType>::Deserialize(
    155         param.request.bbuf_ptr(), &req);
    156 
    157     if (status.ok()) {
    158       ServerWriter<ResponseType> writer(param.call, param.server_context);
    159       status = CatchingFunctionHandler([this, &param, &req, &writer] {
    160         return func_(service_, param.server_context, &req, &writer);
    161       });
    162     }
    163 
    164     CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
    165     if (!param.server_context->sent_initial_metadata_) {
    166       ops.SendInitialMetadata(param.server_context->initial_metadata_,
    167                               param.server_context->initial_metadata_flags());
    168       if (param.server_context->compression_level_set()) {
    169         ops.set_compression_level(param.server_context->compression_level());
    170       }
    171     }
    172     ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
    173     param.call->PerformOps(&ops);
    174     if (param.server_context->has_pending_ops_) {
    175       param.call->cq()->Pluck(&param.server_context->pending_ops_);
    176     }
    177     param.call->cq()->Pluck(&ops);
    178   }
    179 
    180  private:
    181   std::function<Status(ServiceType*, ServerContext*, const RequestType*,
    182                        ServerWriter<ResponseType>*)>
    183       func_;
    184   ServiceType* service_;
    185 };
    186 
    187 /// A wrapper class of an application provided bidi-streaming handler.
    188 /// This also applies to server-streamed implementation of a unary method
    189 /// with the additional requirement that such methods must have done a
    190 /// write for status to be ok
    191 /// Since this is used by more than 1 class, the service is not passed in.
    192 /// Instead, it is expected to be an implicitly-captured argument of func
    193 /// (through bind or something along those lines)
    194 template <class Streamer, bool WriteNeeded>
    195 class TemplatedBidiStreamingHandler : public MethodHandler {
    196  public:
    197   TemplatedBidiStreamingHandler(
    198       std::function<Status(ServerContext*, Streamer*)> func)
    199       : func_(func), write_needed_(WriteNeeded) {}
    200 
    201   void RunHandler(const HandlerParameter& param) final {
    202     Streamer stream(param.call, param.server_context);
    203     Status status = CatchingFunctionHandler([this, &param, &stream] {
    204       return func_(param.server_context, &stream);
    205     });
    206 
    207     CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
    208     if (!param.server_context->sent_initial_metadata_) {
    209       ops.SendInitialMetadata(param.server_context->initial_metadata_,
    210                               param.server_context->initial_metadata_flags());
    211       if (param.server_context->compression_level_set()) {
    212         ops.set_compression_level(param.server_context->compression_level());
    213       }
    214       if (write_needed_ && status.ok()) {
    215         // If we needed a write but never did one, we need to mark the
    216         // status as a fail
    217         status = Status(StatusCode::INTERNAL,
    218                         "Service did not provide response message");
    219       }
    220     }
    221     ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
    222     param.call->PerformOps(&ops);
    223     if (param.server_context->has_pending_ops_) {
    224       param.call->cq()->Pluck(&param.server_context->pending_ops_);
    225     }
    226     param.call->cq()->Pluck(&ops);
    227   }
    228 
    229  private:
    230   std::function<Status(ServerContext*, Streamer*)> func_;
    231   const bool write_needed_;
    232 };
    233 
    234 template <class ServiceType, class RequestType, class ResponseType>
    235 class BidiStreamingHandler
    236     : public TemplatedBidiStreamingHandler<
    237           ServerReaderWriter<ResponseType, RequestType>, false> {
    238  public:
    239   BidiStreamingHandler(
    240       std::function<Status(ServiceType*, ServerContext*,
    241                            ServerReaderWriter<ResponseType, RequestType>*)>
    242           func,
    243       ServiceType* service)
    244       : TemplatedBidiStreamingHandler<
    245             ServerReaderWriter<ResponseType, RequestType>, false>(std::bind(
    246             func, service, std::placeholders::_1, std::placeholders::_2)) {}
    247 };
    248 
    249 template <class RequestType, class ResponseType>
    250 class StreamedUnaryHandler
    251     : public TemplatedBidiStreamingHandler<
    252           ServerUnaryStreamer<RequestType, ResponseType>, true> {
    253  public:
    254   explicit StreamedUnaryHandler(
    255       std::function<Status(ServerContext*,
    256                            ServerUnaryStreamer<RequestType, ResponseType>*)>
    257           func)
    258       : TemplatedBidiStreamingHandler<
    259             ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {}
    260 };
    261 
    262 template <class RequestType, class ResponseType>
    263 class SplitServerStreamingHandler
    264     : public TemplatedBidiStreamingHandler<
    265           ServerSplitStreamer<RequestType, ResponseType>, false> {
    266  public:
    267   explicit SplitServerStreamingHandler(
    268       std::function<Status(ServerContext*,
    269                            ServerSplitStreamer<RequestType, ResponseType>*)>
    270           func)
    271       : TemplatedBidiStreamingHandler<
    272             ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
    273 };
    274 
    275 /// General method handler class for errors that prevent real method use
    276 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
    277 template <StatusCode code>
    278 class ErrorMethodHandler : public MethodHandler {
    279  public:
    280   template <class T>
    281   static void FillOps(ServerContext* context, T* ops) {
    282     Status status(code, "");
    283     if (!context->sent_initial_metadata_) {
    284       ops->SendInitialMetadata(context->initial_metadata_,
    285                                context->initial_metadata_flags());
    286       if (context->compression_level_set()) {
    287         ops->set_compression_level(context->compression_level());
    288       }
    289       context->sent_initial_metadata_ = true;
    290     }
    291     ops->ServerSendStatus(context->trailing_metadata_, status);
    292   }
    293 
    294   void RunHandler(const HandlerParameter& param) final {
    295     CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
    296     FillOps(param.server_context, &ops);
    297     param.call->PerformOps(&ops);
    298     param.call->cq()->Pluck(&ops);
    299     // We also have to destroy any request payload in the handler parameter
    300     ByteBuffer* payload = param.request.bbuf_ptr();
    301     if (payload != nullptr) {
    302       payload->Clear();
    303     }
    304   }
    305 };
    306 
    307 typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler;
    308 typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED>
    309     ResourceExhaustedHandler;
    310 
    311 }  // namespace internal
    312 }  // namespace grpc
    313 
    314 #endif  // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
    315