1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCXX_CHANNEL_FILTER_H 20 #define GRPCXX_CHANNEL_FILTER_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/support/alloc.h> 24 #include <grpcpp/impl/codegen/config.h> 25 26 #include <functional> 27 #include <vector> 28 29 #include "src/core/lib/channel/channel_stack.h" 30 #include "src/core/lib/surface/channel_init.h" 31 #include "src/core/lib/transport/metadata_batch.h" 32 33 /// An interface to define filters. 34 /// 35 /// To define a filter, implement a subclass of each of \c CallData and 36 /// \c ChannelData. Then register the filter using something like this: 37 /// \code{.cpp} 38 /// RegisterChannelFilter<MyChannelDataSubclass, MyCallDataSubclass>( 39 /// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); 40 /// \endcode 41 42 namespace grpc { 43 44 /// A C++ wrapper for the \c grpc_metadata_batch struct. 45 class MetadataBatch { 46 public: 47 /// Borrows a pointer to \a batch, but does NOT take ownership. 48 /// The caller must ensure that \a batch continues to exist for as 49 /// long as the MetadataBatch object does. 50 explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {} 51 52 grpc_metadata_batch* batch() const { return batch_; } 53 54 /// Adds metadata and returns the newly allocated storage. 55 /// The caller takes ownership of the result, which must exist for the 56 /// lifetime of the gRPC call. 57 grpc_linked_mdelem* AddMetadata(const string& key, const string& value); 58 59 class const_iterator : public std::iterator<std::bidirectional_iterator_tag, 60 const grpc_mdelem> { 61 public: 62 const grpc_mdelem& operator*() const { return elem_->md; } 63 const grpc_mdelem operator->() const { return elem_->md; } 64 65 const_iterator& operator++() { 66 elem_ = elem_->next; 67 return *this; 68 } 69 const_iterator operator++(int) { 70 const_iterator tmp(*this); 71 operator++(); 72 return tmp; 73 } 74 const_iterator& operator--() { 75 elem_ = elem_->prev; 76 return *this; 77 } 78 const_iterator operator--(int) { 79 const_iterator tmp(*this); 80 operator--(); 81 return tmp; 82 } 83 84 bool operator==(const const_iterator& other) const { 85 return elem_ == other.elem_; 86 } 87 bool operator!=(const const_iterator& other) const { 88 return elem_ != other.elem_; 89 } 90 91 private: 92 friend class MetadataBatch; 93 explicit const_iterator(grpc_linked_mdelem* elem) : elem_(elem) {} 94 95 grpc_linked_mdelem* elem_; 96 }; 97 98 const_iterator begin() const { return const_iterator(batch_->list.head); } 99 const_iterator end() const { return const_iterator(nullptr); } 100 101 private: 102 grpc_metadata_batch* batch_; // Not owned. 103 }; 104 105 /// A C++ wrapper for the \c grpc_transport_op struct. 106 class TransportOp { 107 public: 108 /// Borrows a pointer to \a op, but does NOT take ownership. 109 /// The caller must ensure that \a op continues to exist for as 110 /// long as the TransportOp object does. 111 explicit TransportOp(grpc_transport_op* op) : op_(op) {} 112 113 grpc_transport_op* op() const { return op_; } 114 115 // TODO(roth): Add a C++ wrapper for grpc_error? 116 grpc_error* disconnect_with_error() const { 117 return op_->disconnect_with_error; 118 } 119 bool send_goaway() const { return op_->goaway_error != GRPC_ERROR_NONE; } 120 121 // TODO(roth): Add methods for additional fields as needed. 122 123 private: 124 grpc_transport_op* op_; // Not owned. 125 }; 126 127 /// A C++ wrapper for the \c grpc_transport_stream_op_batch struct. 128 class TransportStreamOpBatch { 129 public: 130 /// Borrows a pointer to \a op, but does NOT take ownership. 131 /// The caller must ensure that \a op continues to exist for as 132 /// long as the TransportStreamOpBatch object does. 133 explicit TransportStreamOpBatch(grpc_transport_stream_op_batch* op) 134 : op_(op), 135 send_initial_metadata_( 136 op->send_initial_metadata 137 ? op->payload->send_initial_metadata.send_initial_metadata 138 : nullptr), 139 send_trailing_metadata_( 140 op->send_trailing_metadata 141 ? op->payload->send_trailing_metadata.send_trailing_metadata 142 : nullptr), 143 recv_initial_metadata_( 144 op->recv_initial_metadata 145 ? op->payload->recv_initial_metadata.recv_initial_metadata 146 : nullptr), 147 recv_trailing_metadata_( 148 op->recv_trailing_metadata 149 ? op->payload->recv_trailing_metadata.recv_trailing_metadata 150 : nullptr) {} 151 152 grpc_transport_stream_op_batch* op() const { return op_; } 153 154 grpc_closure* on_complete() const { return op_->on_complete; } 155 void set_on_complete(grpc_closure* closure) { op_->on_complete = closure; } 156 157 MetadataBatch* send_initial_metadata() { 158 return op_->send_initial_metadata ? &send_initial_metadata_ : nullptr; 159 } 160 MetadataBatch* send_trailing_metadata() { 161 return op_->send_trailing_metadata ? &send_trailing_metadata_ : nullptr; 162 } 163 MetadataBatch* recv_initial_metadata() { 164 return op_->recv_initial_metadata ? &recv_initial_metadata_ : nullptr; 165 } 166 MetadataBatch* recv_trailing_metadata() { 167 return op_->recv_trailing_metadata ? &recv_trailing_metadata_ : nullptr; 168 } 169 170 uint32_t* send_initial_metadata_flags() const { 171 return op_->send_initial_metadata ? &op_->payload->send_initial_metadata 172 .send_initial_metadata_flags 173 : nullptr; 174 } 175 176 grpc_closure* recv_initial_metadata_ready() const { 177 return op_->recv_initial_metadata 178 ? op_->payload->recv_initial_metadata.recv_initial_metadata_ready 179 : nullptr; 180 } 181 void set_recv_initial_metadata_ready(grpc_closure* closure) { 182 op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure; 183 } 184 185 grpc_core::OrphanablePtr<grpc_core::ByteStream>* send_message() const { 186 return op_->send_message ? &op_->payload->send_message.send_message 187 : nullptr; 188 } 189 void set_send_message( 190 grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message) { 191 op_->send_message = true; 192 op_->payload->send_message.send_message = std::move(send_message); 193 } 194 195 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message() const { 196 return op_->recv_message ? op_->payload->recv_message.recv_message 197 : nullptr; 198 } 199 void set_recv_message( 200 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message) { 201 op_->recv_message = true; 202 op_->payload->recv_message.recv_message = recv_message; 203 } 204 205 census_context* get_census_context() const { 206 return static_cast<census_context*>( 207 op_->payload->context[GRPC_CONTEXT_TRACING].value); 208 } 209 210 const gpr_atm* get_peer_string() const { 211 if (op_->send_initial_metadata && 212 op_->payload->send_initial_metadata.peer_string != nullptr) { 213 return op_->payload->send_initial_metadata.peer_string; 214 } else if (op_->recv_initial_metadata && 215 op_->payload->recv_initial_metadata.peer_string != nullptr) { 216 return op_->payload->recv_initial_metadata.peer_string; 217 } else { 218 return nullptr; 219 } 220 } 221 222 private: 223 grpc_transport_stream_op_batch* op_; // Not owned. 224 MetadataBatch send_initial_metadata_; 225 MetadataBatch send_trailing_metadata_; 226 MetadataBatch recv_initial_metadata_; 227 MetadataBatch recv_trailing_metadata_; 228 }; 229 230 /// Represents channel data. 231 class ChannelData { 232 public: 233 ChannelData() {} 234 virtual ~ChannelData() {} 235 236 // TODO(roth): Come up with a more C++-like API for the channel element. 237 238 /// Initializes the channel data. 239 virtual grpc_error* Init(grpc_channel_element* elem, 240 grpc_channel_element_args* args) { 241 return GRPC_ERROR_NONE; 242 } 243 244 // Called before destruction. 245 virtual void Destroy(grpc_channel_element* elem) {} 246 247 virtual void StartTransportOp(grpc_channel_element* elem, TransportOp* op); 248 249 virtual void GetInfo(grpc_channel_element* elem, 250 const grpc_channel_info* channel_info); 251 }; 252 253 /// Represents call data. 254 class CallData { 255 public: 256 CallData() {} 257 virtual ~CallData() {} 258 259 // TODO(roth): Come up with a more C++-like API for the call element. 260 261 /// Initializes the call data. 262 virtual grpc_error* Init(grpc_call_element* elem, 263 const grpc_call_element_args* args) { 264 return GRPC_ERROR_NONE; 265 } 266 267 // Called before destruction. 268 virtual void Destroy(grpc_call_element* elem, 269 const grpc_call_final_info* final_info, 270 grpc_closure* then_call_closure) {} 271 272 /// Starts a new stream operation. 273 virtual void StartTransportStreamOpBatch(grpc_call_element* elem, 274 TransportStreamOpBatch* op); 275 276 /// Sets a pollset or pollset set. 277 virtual void SetPollsetOrPollsetSet(grpc_call_element* elem, 278 grpc_polling_entity* pollent); 279 }; 280 281 namespace internal { 282 283 // Defines static members for passing to C core. 284 // Members of this class correspond to the members of the C 285 // grpc_channel_filter struct. 286 template <typename ChannelDataType, typename CallDataType> 287 class ChannelFilter final { 288 public: 289 static const size_t channel_data_size = sizeof(ChannelDataType); 290 291 static grpc_error* InitChannelElement(grpc_channel_element* elem, 292 grpc_channel_element_args* args) { 293 // Construct the object in the already-allocated memory. 294 ChannelDataType* channel_data = new (elem->channel_data) ChannelDataType(); 295 return channel_data->Init(elem, args); 296 } 297 298 static void DestroyChannelElement(grpc_channel_element* elem) { 299 ChannelDataType* channel_data = 300 static_cast<ChannelDataType*>(elem->channel_data); 301 channel_data->Destroy(elem); 302 channel_data->~ChannelDataType(); 303 } 304 305 static void StartTransportOp(grpc_channel_element* elem, 306 grpc_transport_op* op) { 307 ChannelDataType* channel_data = 308 static_cast<ChannelDataType*>(elem->channel_data); 309 TransportOp op_wrapper(op); 310 channel_data->StartTransportOp(elem, &op_wrapper); 311 } 312 313 static void GetChannelInfo(grpc_channel_element* elem, 314 const grpc_channel_info* channel_info) { 315 ChannelDataType* channel_data = 316 static_cast<ChannelDataType*>(elem->channel_data); 317 channel_data->GetInfo(elem, channel_info); 318 } 319 320 static const size_t call_data_size = sizeof(CallDataType); 321 322 static grpc_error* InitCallElement(grpc_call_element* elem, 323 const grpc_call_element_args* args) { 324 // Construct the object in the already-allocated memory. 325 CallDataType* call_data = new (elem->call_data) CallDataType(); 326 return call_data->Init(elem, args); 327 } 328 329 static void DestroyCallElement(grpc_call_element* elem, 330 const grpc_call_final_info* final_info, 331 grpc_closure* then_call_closure) { 332 CallDataType* call_data = static_cast<CallDataType*>(elem->call_data); 333 call_data->Destroy(elem, final_info, then_call_closure); 334 call_data->~CallDataType(); 335 } 336 337 static void StartTransportStreamOpBatch(grpc_call_element* elem, 338 grpc_transport_stream_op_batch* op) { 339 CallDataType* call_data = static_cast<CallDataType*>(elem->call_data); 340 TransportStreamOpBatch op_wrapper(op); 341 call_data->StartTransportStreamOpBatch(elem, &op_wrapper); 342 } 343 344 static void SetPollsetOrPollsetSet(grpc_call_element* elem, 345 grpc_polling_entity* pollent) { 346 CallDataType* call_data = static_cast<CallDataType*>(elem->call_data); 347 call_data->SetPollsetOrPollsetSet(elem, pollent); 348 } 349 }; 350 351 struct FilterRecord { 352 grpc_channel_stack_type stack_type; 353 int priority; 354 std::function<bool(const grpc_channel_args&)> include_filter; 355 grpc_channel_filter filter; 356 }; 357 extern std::vector<FilterRecord>* channel_filters; 358 359 void ChannelFilterPluginInit(); 360 void ChannelFilterPluginShutdown(); 361 362 } // namespace internal 363 364 /// Registers a new filter. 365 /// Must be called by only one thread at a time. 366 /// The \a include_filter argument specifies a function that will be called 367 /// to determine at run-time whether or not to add the filter. If the 368 /// value is nullptr, the filter will be added unconditionally. 369 template <typename ChannelDataType, typename CallDataType> 370 void RegisterChannelFilter( 371 const char* name, grpc_channel_stack_type stack_type, int priority, 372 std::function<bool(const grpc_channel_args&)> include_filter) { 373 // If we haven't been called before, initialize channel_filters and 374 // call grpc_register_plugin(). 375 if (internal::channel_filters == nullptr) { 376 grpc_register_plugin(internal::ChannelFilterPluginInit, 377 internal::ChannelFilterPluginShutdown); 378 internal::channel_filters = new std::vector<internal::FilterRecord>(); 379 } 380 // Add an entry to channel_filters. The filter will be added when the 381 // C-core initialization code calls ChannelFilterPluginInit(). 382 typedef internal::ChannelFilter<ChannelDataType, CallDataType> FilterType; 383 internal::FilterRecord filter_record = { 384 stack_type, 385 priority, 386 include_filter, 387 {FilterType::StartTransportStreamOpBatch, FilterType::StartTransportOp, 388 FilterType::call_data_size, FilterType::InitCallElement, 389 FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, 390 FilterType::channel_data_size, FilterType::InitChannelElement, 391 FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}}; 392 internal::channel_filters->push_back(filter_record); 393 } 394 395 } // namespace grpc 396 397 #endif // GRPCXX_CHANNEL_FILTER_H 398