1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H 20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H 21 22 #include <grpcpp/impl/codegen/byte_buffer.h> 23 #include <grpcpp/impl/codegen/core_codegen_interface.h> 24 #include <grpcpp/impl/codegen/rpc_service_method.h> 25 #include <grpcpp/impl/codegen/sync_stream.h> 26 27 namespace grpc { 28 29 namespace internal { 30 31 // Invoke the method handler, fill in the status, and 32 // return whether or not we finished safely (without an exception). 33 // Note that exception handling is 0-cost in most compiler/library 34 // implementations (except when an exception is actually thrown), 35 // so this process doesn't require additional overhead in the common case. 36 // Additionally, we don't need to return if we caught an exception or not; 37 // the handling is the same in either case. 38 template <class Callable> 39 Status CatchingFunctionHandler(Callable&& handler) { 40 #if GRPC_ALLOW_EXCEPTIONS 41 try { 42 return handler(); 43 } catch (...) { 44 return Status(StatusCode::UNKNOWN, "Unexpected error in RPC handling"); 45 } 46 #else // GRPC_ALLOW_EXCEPTIONS 47 return handler(); 48 #endif // GRPC_ALLOW_EXCEPTIONS 49 } 50 51 /// A wrapper class of an application provided rpc method handler. 52 template <class ServiceType, class RequestType, class ResponseType> 53 class RpcMethodHandler : public MethodHandler { 54 public: 55 RpcMethodHandler(std::function<Status(ServiceType*, ServerContext*, 56 const RequestType*, ResponseType*)> 57 func, 58 ServiceType* service) 59 : func_(func), service_(service) {} 60 61 void RunHandler(const HandlerParameter& param) final { 62 RequestType req; 63 Status status = SerializationTraits<RequestType>::Deserialize( 64 param.request.bbuf_ptr(), &req); 65 ResponseType rsp; 66 if (status.ok()) { 67 status = CatchingFunctionHandler([this, ¶m, &req, &rsp] { 68 return func_(service_, param.server_context, &req, &rsp); 69 }); 70 } 71 72 GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_); 73 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, 74 CallOpServerSendStatus> 75 ops; 76 ops.SendInitialMetadata(param.server_context->initial_metadata_, 77 param.server_context->initial_metadata_flags()); 78 if (param.server_context->compression_level_set()) { 79 ops.set_compression_level(param.server_context->compression_level()); 80 } 81 if (status.ok()) { 82 status = ops.SendMessage(rsp); 83 } 84 ops.ServerSendStatus(param.server_context->trailing_metadata_, status); 85 param.call->PerformOps(&ops); 86 param.call->cq()->Pluck(&ops); 87 } 88 89 private: 90 /// Application provided rpc handler function. 91 std::function<Status(ServiceType*, ServerContext*, const RequestType*, 92 ResponseType*)> 93 func_; 94 // The class the above handler function lives in. 95 ServiceType* service_; 96 }; 97 98 /// A wrapper class of an application provided client streaming handler. 99 template <class ServiceType, class RequestType, class ResponseType> 100 class ClientStreamingHandler : public MethodHandler { 101 public: 102 ClientStreamingHandler( 103 std::function<Status(ServiceType*, ServerContext*, 104 ServerReader<RequestType>*, ResponseType*)> 105 func, 106 ServiceType* service) 107 : func_(func), service_(service) {} 108 109 void RunHandler(const HandlerParameter& param) final { 110 ServerReader<RequestType> reader(param.call, param.server_context); 111 ResponseType rsp; 112 Status status = CatchingFunctionHandler([this, ¶m, &reader, &rsp] { 113 return func_(service_, param.server_context, &reader, &rsp); 114 }); 115 116 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, 117 CallOpServerSendStatus> 118 ops; 119 if (!param.server_context->sent_initial_metadata_) { 120 ops.SendInitialMetadata(param.server_context->initial_metadata_, 121 param.server_context->initial_metadata_flags()); 122 if (param.server_context->compression_level_set()) { 123 ops.set_compression_level(param.server_context->compression_level()); 124 } 125 } 126 if (status.ok()) { 127 status = ops.SendMessage(rsp); 128 } 129 ops.ServerSendStatus(param.server_context->trailing_metadata_, status); 130 param.call->PerformOps(&ops); 131 param.call->cq()->Pluck(&ops); 132 } 133 134 private: 135 std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*, 136 ResponseType*)> 137 func_; 138 ServiceType* service_; 139 }; 140 141 /// A wrapper class of an application provided server streaming handler. 142 template <class ServiceType, class RequestType, class ResponseType> 143 class ServerStreamingHandler : public MethodHandler { 144 public: 145 ServerStreamingHandler( 146 std::function<Status(ServiceType*, ServerContext*, const RequestType*, 147 ServerWriter<ResponseType>*)> 148 func, 149 ServiceType* service) 150 : func_(func), service_(service) {} 151 152 void RunHandler(const HandlerParameter& param) final { 153 RequestType req; 154 Status status = SerializationTraits<RequestType>::Deserialize( 155 param.request.bbuf_ptr(), &req); 156 157 if (status.ok()) { 158 ServerWriter<ResponseType> writer(param.call, param.server_context); 159 status = CatchingFunctionHandler([this, ¶m, &req, &writer] { 160 return func_(service_, param.server_context, &req, &writer); 161 }); 162 } 163 164 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; 165 if (!param.server_context->sent_initial_metadata_) { 166 ops.SendInitialMetadata(param.server_context->initial_metadata_, 167 param.server_context->initial_metadata_flags()); 168 if (param.server_context->compression_level_set()) { 169 ops.set_compression_level(param.server_context->compression_level()); 170 } 171 } 172 ops.ServerSendStatus(param.server_context->trailing_metadata_, status); 173 param.call->PerformOps(&ops); 174 if (param.server_context->has_pending_ops_) { 175 param.call->cq()->Pluck(¶m.server_context->pending_ops_); 176 } 177 param.call->cq()->Pluck(&ops); 178 } 179 180 private: 181 std::function<Status(ServiceType*, ServerContext*, const RequestType*, 182 ServerWriter<ResponseType>*)> 183 func_; 184 ServiceType* service_; 185 }; 186 187 /// A wrapper class of an application provided bidi-streaming handler. 188 /// This also applies to server-streamed implementation of a unary method 189 /// with the additional requirement that such methods must have done a 190 /// write for status to be ok 191 /// Since this is used by more than 1 class, the service is not passed in. 192 /// Instead, it is expected to be an implicitly-captured argument of func 193 /// (through bind or something along those lines) 194 template <class Streamer, bool WriteNeeded> 195 class TemplatedBidiStreamingHandler : public MethodHandler { 196 public: 197 TemplatedBidiStreamingHandler( 198 std::function<Status(ServerContext*, Streamer*)> func) 199 : func_(func), write_needed_(WriteNeeded) {} 200 201 void RunHandler(const HandlerParameter& param) final { 202 Streamer stream(param.call, param.server_context); 203 Status status = CatchingFunctionHandler([this, ¶m, &stream] { 204 return func_(param.server_context, &stream); 205 }); 206 207 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; 208 if (!param.server_context->sent_initial_metadata_) { 209 ops.SendInitialMetadata(param.server_context->initial_metadata_, 210 param.server_context->initial_metadata_flags()); 211 if (param.server_context->compression_level_set()) { 212 ops.set_compression_level(param.server_context->compression_level()); 213 } 214 if (write_needed_ && status.ok()) { 215 // If we needed a write but never did one, we need to mark the 216 // status as a fail 217 status = Status(StatusCode::INTERNAL, 218 "Service did not provide response message"); 219 } 220 } 221 ops.ServerSendStatus(param.server_context->trailing_metadata_, status); 222 param.call->PerformOps(&ops); 223 if (param.server_context->has_pending_ops_) { 224 param.call->cq()->Pluck(¶m.server_context->pending_ops_); 225 } 226 param.call->cq()->Pluck(&ops); 227 } 228 229 private: 230 std::function<Status(ServerContext*, Streamer*)> func_; 231 const bool write_needed_; 232 }; 233 234 template <class ServiceType, class RequestType, class ResponseType> 235 class BidiStreamingHandler 236 : public TemplatedBidiStreamingHandler< 237 ServerReaderWriter<ResponseType, RequestType>, false> { 238 public: 239 BidiStreamingHandler( 240 std::function<Status(ServiceType*, ServerContext*, 241 ServerReaderWriter<ResponseType, RequestType>*)> 242 func, 243 ServiceType* service) 244 : TemplatedBidiStreamingHandler< 245 ServerReaderWriter<ResponseType, RequestType>, false>(std::bind( 246 func, service, std::placeholders::_1, std::placeholders::_2)) {} 247 }; 248 249 template <class RequestType, class ResponseType> 250 class StreamedUnaryHandler 251 : public TemplatedBidiStreamingHandler< 252 ServerUnaryStreamer<RequestType, ResponseType>, true> { 253 public: 254 explicit StreamedUnaryHandler( 255 std::function<Status(ServerContext*, 256 ServerUnaryStreamer<RequestType, ResponseType>*)> 257 func) 258 : TemplatedBidiStreamingHandler< 259 ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {} 260 }; 261 262 template <class RequestType, class ResponseType> 263 class SplitServerStreamingHandler 264 : public TemplatedBidiStreamingHandler< 265 ServerSplitStreamer<RequestType, ResponseType>, false> { 266 public: 267 explicit SplitServerStreamingHandler( 268 std::function<Status(ServerContext*, 269 ServerSplitStreamer<RequestType, ResponseType>*)> 270 func) 271 : TemplatedBidiStreamingHandler< 272 ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} 273 }; 274 275 /// General method handler class for errors that prevent real method use 276 /// e.g., handle unknown method by returning UNIMPLEMENTED error. 277 template <StatusCode code> 278 class ErrorMethodHandler : public MethodHandler { 279 public: 280 template <class T> 281 static void FillOps(ServerContext* context, T* ops) { 282 Status status(code, ""); 283 if (!context->sent_initial_metadata_) { 284 ops->SendInitialMetadata(context->initial_metadata_, 285 context->initial_metadata_flags()); 286 if (context->compression_level_set()) { 287 ops->set_compression_level(context->compression_level()); 288 } 289 context->sent_initial_metadata_ = true; 290 } 291 ops->ServerSendStatus(context->trailing_metadata_, status); 292 } 293 294 void RunHandler(const HandlerParameter& param) final { 295 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; 296 FillOps(param.server_context, &ops); 297 param.call->PerformOps(&ops); 298 param.call->cq()->Pluck(&ops); 299 // We also have to destroy any request payload in the handler parameter 300 ByteBuffer* payload = param.request.bbuf_ptr(); 301 if (payload != nullptr) { 302 payload->Clear(); 303 } 304 } 305 }; 306 307 typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler; 308 typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED> 309 ResourceExhaustedHandler; 310 311 } // namespace internal 312 } // namespace grpc 313 314 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H 315