Home | History | Annotate | Download | only in common
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPCXX_CHANNEL_FILTER_H
     20 #define GRPCXX_CHANNEL_FILTER_H
     21 
     22 #include <grpc/grpc.h>
     23 #include <grpc/support/alloc.h>
     24 #include <grpcpp/impl/codegen/config.h>
     25 
     26 #include <functional>
     27 #include <vector>
     28 
     29 #include "src/core/lib/channel/channel_stack.h"
     30 #include "src/core/lib/surface/channel_init.h"
     31 #include "src/core/lib/transport/metadata_batch.h"
     32 
     33 /// An interface to define filters.
     34 ///
     35 /// To define a filter, implement a subclass of each of \c CallData and
     36 /// \c ChannelData. Then register the filter using something like this:
     37 /// \code{.cpp}
     38 ///   RegisterChannelFilter<MyChannelDataSubclass, MyCallDataSubclass>(
     39 ///       "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
     40 /// \endcode
     41 
     42 namespace grpc {
     43 
     44 /// A C++ wrapper for the \c grpc_metadata_batch struct.
     45 class MetadataBatch {
     46  public:
     47   /// Borrows a pointer to \a batch, but does NOT take ownership.
     48   /// The caller must ensure that \a batch continues to exist for as
     49   /// long as the MetadataBatch object does.
     50   explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {}
     51 
     52   grpc_metadata_batch* batch() const { return batch_; }
     53 
     54   /// Adds metadata and returns the newly allocated storage.
     55   /// The caller takes ownership of the result, which must exist for the
     56   /// lifetime of the gRPC call.
     57   grpc_linked_mdelem* AddMetadata(const string& key, const string& value);
     58 
     59   class const_iterator : public std::iterator<std::bidirectional_iterator_tag,
     60                                               const grpc_mdelem> {
     61    public:
     62     const grpc_mdelem& operator*() const { return elem_->md; }
     63     const grpc_mdelem operator->() const { return elem_->md; }
     64 
     65     const_iterator& operator++() {
     66       elem_ = elem_->next;
     67       return *this;
     68     }
     69     const_iterator operator++(int) {
     70       const_iterator tmp(*this);
     71       operator++();
     72       return tmp;
     73     }
     74     const_iterator& operator--() {
     75       elem_ = elem_->prev;
     76       return *this;
     77     }
     78     const_iterator operator--(int) {
     79       const_iterator tmp(*this);
     80       operator--();
     81       return tmp;
     82     }
     83 
     84     bool operator==(const const_iterator& other) const {
     85       return elem_ == other.elem_;
     86     }
     87     bool operator!=(const const_iterator& other) const {
     88       return elem_ != other.elem_;
     89     }
     90 
     91    private:
     92     friend class MetadataBatch;
     93     explicit const_iterator(grpc_linked_mdelem* elem) : elem_(elem) {}
     94 
     95     grpc_linked_mdelem* elem_;
     96   };
     97 
     98   const_iterator begin() const { return const_iterator(batch_->list.head); }
     99   const_iterator end() const { return const_iterator(nullptr); }
    100 
    101  private:
    102   grpc_metadata_batch* batch_;  // Not owned.
    103 };
    104 
    105 /// A C++ wrapper for the \c grpc_transport_op struct.
    106 class TransportOp {
    107  public:
    108   /// Borrows a pointer to \a op, but does NOT take ownership.
    109   /// The caller must ensure that \a op continues to exist for as
    110   /// long as the TransportOp object does.
    111   explicit TransportOp(grpc_transport_op* op) : op_(op) {}
    112 
    113   grpc_transport_op* op() const { return op_; }
    114 
    115   // TODO(roth): Add a C++ wrapper for grpc_error?
    116   grpc_error* disconnect_with_error() const {
    117     return op_->disconnect_with_error;
    118   }
    119   bool send_goaway() const { return op_->goaway_error != GRPC_ERROR_NONE; }
    120 
    121   // TODO(roth): Add methods for additional fields as needed.
    122 
    123  private:
    124   grpc_transport_op* op_;  // Not owned.
    125 };
    126 
    127 /// A C++ wrapper for the \c grpc_transport_stream_op_batch struct.
    128 class TransportStreamOpBatch {
    129  public:
    130   /// Borrows a pointer to \a op, but does NOT take ownership.
    131   /// The caller must ensure that \a op continues to exist for as
    132   /// long as the TransportStreamOpBatch object does.
    133   explicit TransportStreamOpBatch(grpc_transport_stream_op_batch* op)
    134       : op_(op),
    135         send_initial_metadata_(
    136             op->send_initial_metadata
    137                 ? op->payload->send_initial_metadata.send_initial_metadata
    138                 : nullptr),
    139         send_trailing_metadata_(
    140             op->send_trailing_metadata
    141                 ? op->payload->send_trailing_metadata.send_trailing_metadata
    142                 : nullptr),
    143         recv_initial_metadata_(
    144             op->recv_initial_metadata
    145                 ? op->payload->recv_initial_metadata.recv_initial_metadata
    146                 : nullptr),
    147         recv_trailing_metadata_(
    148             op->recv_trailing_metadata
    149                 ? op->payload->recv_trailing_metadata.recv_trailing_metadata
    150                 : nullptr) {}
    151 
    152   grpc_transport_stream_op_batch* op() const { return op_; }
    153 
    154   grpc_closure* on_complete() const { return op_->on_complete; }
    155   void set_on_complete(grpc_closure* closure) { op_->on_complete = closure; }
    156 
    157   MetadataBatch* send_initial_metadata() {
    158     return op_->send_initial_metadata ? &send_initial_metadata_ : nullptr;
    159   }
    160   MetadataBatch* send_trailing_metadata() {
    161     return op_->send_trailing_metadata ? &send_trailing_metadata_ : nullptr;
    162   }
    163   MetadataBatch* recv_initial_metadata() {
    164     return op_->recv_initial_metadata ? &recv_initial_metadata_ : nullptr;
    165   }
    166   MetadataBatch* recv_trailing_metadata() {
    167     return op_->recv_trailing_metadata ? &recv_trailing_metadata_ : nullptr;
    168   }
    169 
    170   uint32_t* send_initial_metadata_flags() const {
    171     return op_->send_initial_metadata ? &op_->payload->send_initial_metadata
    172                                              .send_initial_metadata_flags
    173                                       : nullptr;
    174   }
    175 
    176   grpc_closure* recv_initial_metadata_ready() const {
    177     return op_->recv_initial_metadata
    178                ? op_->payload->recv_initial_metadata.recv_initial_metadata_ready
    179                : nullptr;
    180   }
    181   void set_recv_initial_metadata_ready(grpc_closure* closure) {
    182     op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure;
    183   }
    184 
    185   grpc_core::OrphanablePtr<grpc_core::ByteStream>* send_message() const {
    186     return op_->send_message ? &op_->payload->send_message.send_message
    187                              : nullptr;
    188   }
    189   void set_send_message(
    190       grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message) {
    191     op_->send_message = true;
    192     op_->payload->send_message.send_message = std::move(send_message);
    193   }
    194 
    195   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message() const {
    196     return op_->recv_message ? op_->payload->recv_message.recv_message
    197                              : nullptr;
    198   }
    199   void set_recv_message(
    200       grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message) {
    201     op_->recv_message = true;
    202     op_->payload->recv_message.recv_message = recv_message;
    203   }
    204 
    205   census_context* get_census_context() const {
    206     return static_cast<census_context*>(
    207         op_->payload->context[GRPC_CONTEXT_TRACING].value);
    208   }
    209 
    210   const gpr_atm* get_peer_string() const {
    211     if (op_->send_initial_metadata &&
    212         op_->payload->send_initial_metadata.peer_string != nullptr) {
    213       return op_->payload->send_initial_metadata.peer_string;
    214     } else if (op_->recv_initial_metadata &&
    215                op_->payload->recv_initial_metadata.peer_string != nullptr) {
    216       return op_->payload->recv_initial_metadata.peer_string;
    217     } else {
    218       return nullptr;
    219     }
    220   }
    221 
    222  private:
    223   grpc_transport_stream_op_batch* op_;  // Not owned.
    224   MetadataBatch send_initial_metadata_;
    225   MetadataBatch send_trailing_metadata_;
    226   MetadataBatch recv_initial_metadata_;
    227   MetadataBatch recv_trailing_metadata_;
    228 };
    229 
    230 /// Represents channel data.
    231 class ChannelData {
    232  public:
    233   ChannelData() {}
    234   virtual ~ChannelData() {}
    235 
    236   // TODO(roth): Come up with a more C++-like API for the channel element.
    237 
    238   /// Initializes the channel data.
    239   virtual grpc_error* Init(grpc_channel_element* elem,
    240                            grpc_channel_element_args* args) {
    241     return GRPC_ERROR_NONE;
    242   }
    243 
    244   // Called before destruction.
    245   virtual void Destroy(grpc_channel_element* elem) {}
    246 
    247   virtual void StartTransportOp(grpc_channel_element* elem, TransportOp* op);
    248 
    249   virtual void GetInfo(grpc_channel_element* elem,
    250                        const grpc_channel_info* channel_info);
    251 };
    252 
    253 /// Represents call data.
    254 class CallData {
    255  public:
    256   CallData() {}
    257   virtual ~CallData() {}
    258 
    259   // TODO(roth): Come up with a more C++-like API for the call element.
    260 
    261   /// Initializes the call data.
    262   virtual grpc_error* Init(grpc_call_element* elem,
    263                            const grpc_call_element_args* args) {
    264     return GRPC_ERROR_NONE;
    265   }
    266 
    267   // Called before destruction.
    268   virtual void Destroy(grpc_call_element* elem,
    269                        const grpc_call_final_info* final_info,
    270                        grpc_closure* then_call_closure) {}
    271 
    272   /// Starts a new stream operation.
    273   virtual void StartTransportStreamOpBatch(grpc_call_element* elem,
    274                                            TransportStreamOpBatch* op);
    275 
    276   /// Sets a pollset or pollset set.
    277   virtual void SetPollsetOrPollsetSet(grpc_call_element* elem,
    278                                       grpc_polling_entity* pollent);
    279 };
    280 
    281 namespace internal {
    282 
    283 // Defines static members for passing to C core.
    284 // Members of this class correspond to the members of the C
    285 // grpc_channel_filter struct.
    286 template <typename ChannelDataType, typename CallDataType>
    287 class ChannelFilter final {
    288  public:
    289   static const size_t channel_data_size = sizeof(ChannelDataType);
    290 
    291   static grpc_error* InitChannelElement(grpc_channel_element* elem,
    292                                         grpc_channel_element_args* args) {
    293     // Construct the object in the already-allocated memory.
    294     ChannelDataType* channel_data = new (elem->channel_data) ChannelDataType();
    295     return channel_data->Init(elem, args);
    296   }
    297 
    298   static void DestroyChannelElement(grpc_channel_element* elem) {
    299     ChannelDataType* channel_data =
    300         static_cast<ChannelDataType*>(elem->channel_data);
    301     channel_data->Destroy(elem);
    302     channel_data->~ChannelDataType();
    303   }
    304 
    305   static void StartTransportOp(grpc_channel_element* elem,
    306                                grpc_transport_op* op) {
    307     ChannelDataType* channel_data =
    308         static_cast<ChannelDataType*>(elem->channel_data);
    309     TransportOp op_wrapper(op);
    310     channel_data->StartTransportOp(elem, &op_wrapper);
    311   }
    312 
    313   static void GetChannelInfo(grpc_channel_element* elem,
    314                              const grpc_channel_info* channel_info) {
    315     ChannelDataType* channel_data =
    316         static_cast<ChannelDataType*>(elem->channel_data);
    317     channel_data->GetInfo(elem, channel_info);
    318   }
    319 
    320   static const size_t call_data_size = sizeof(CallDataType);
    321 
    322   static grpc_error* InitCallElement(grpc_call_element* elem,
    323                                      const grpc_call_element_args* args) {
    324     // Construct the object in the already-allocated memory.
    325     CallDataType* call_data = new (elem->call_data) CallDataType();
    326     return call_data->Init(elem, args);
    327   }
    328 
    329   static void DestroyCallElement(grpc_call_element* elem,
    330                                  const grpc_call_final_info* final_info,
    331                                  grpc_closure* then_call_closure) {
    332     CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
    333     call_data->Destroy(elem, final_info, then_call_closure);
    334     call_data->~CallDataType();
    335   }
    336 
    337   static void StartTransportStreamOpBatch(grpc_call_element* elem,
    338                                           grpc_transport_stream_op_batch* op) {
    339     CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
    340     TransportStreamOpBatch op_wrapper(op);
    341     call_data->StartTransportStreamOpBatch(elem, &op_wrapper);
    342   }
    343 
    344   static void SetPollsetOrPollsetSet(grpc_call_element* elem,
    345                                      grpc_polling_entity* pollent) {
    346     CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
    347     call_data->SetPollsetOrPollsetSet(elem, pollent);
    348   }
    349 };
    350 
    351 struct FilterRecord {
    352   grpc_channel_stack_type stack_type;
    353   int priority;
    354   std::function<bool(const grpc_channel_args&)> include_filter;
    355   grpc_channel_filter filter;
    356 };
    357 extern std::vector<FilterRecord>* channel_filters;
    358 
    359 void ChannelFilterPluginInit();
    360 void ChannelFilterPluginShutdown();
    361 
    362 }  // namespace internal
    363 
    364 /// Registers a new filter.
    365 /// Must be called by only one thread at a time.
    366 /// The \a include_filter argument specifies a function that will be called
    367 /// to determine at run-time whether or not to add the filter. If the
    368 /// value is nullptr, the filter will be added unconditionally.
    369 template <typename ChannelDataType, typename CallDataType>
    370 void RegisterChannelFilter(
    371     const char* name, grpc_channel_stack_type stack_type, int priority,
    372     std::function<bool(const grpc_channel_args&)> include_filter) {
    373   // If we haven't been called before, initialize channel_filters and
    374   // call grpc_register_plugin().
    375   if (internal::channel_filters == nullptr) {
    376     grpc_register_plugin(internal::ChannelFilterPluginInit,
    377                          internal::ChannelFilterPluginShutdown);
    378     internal::channel_filters = new std::vector<internal::FilterRecord>();
    379   }
    380   // Add an entry to channel_filters. The filter will be added when the
    381   // C-core initialization code calls ChannelFilterPluginInit().
    382   typedef internal::ChannelFilter<ChannelDataType, CallDataType> FilterType;
    383   internal::FilterRecord filter_record = {
    384       stack_type,
    385       priority,
    386       include_filter,
    387       {FilterType::StartTransportStreamOpBatch, FilterType::StartTransportOp,
    388        FilterType::call_data_size, FilterType::InitCallElement,
    389        FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement,
    390        FilterType::channel_data_size, FilterType::InitChannelElement,
    391        FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}};
    392   internal::channel_filters->push_back(filter_record);
    393 }
    394 
    395 }  // namespace grpc
    396 
    397 #endif  // GRPCXX_CHANNEL_FILTER_H
    398