Home | History | Annotate | Download | only in workarounds
      1 //
      2 // Copyright 2017 gRPC authors.
      3 //
      4 // Licensed under the Apache License, Version 2.0 (the "License");
      5 // you may not use this file except in compliance with the License.
      6 // You may obtain a copy of the License at
      7 //
      8 //     http://www.apache.org/licenses/LICENSE-2.0
      9 //
     10 // Unless required by applicable law or agreed to in writing, software
     11 // distributed under the License is distributed on an "AS IS" BASIS,
     12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 // See the License for the specific language governing permissions and
     14 // limitations under the License.
     15 //
     16 
     17 #include <grpc/support/port_platform.h>
     18 
     19 #include "src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h"
     20 
     21 #include <string.h>
     22 
     23 #include <grpc/support/alloc.h>
     24 
     25 #include "src/core/ext/filters/workarounds/workaround_utils.h"
     26 #include "src/core/lib/channel/channel_stack_builder.h"
     27 #include "src/core/lib/surface/channel_init.h"
     28 #include "src/core/lib/transport/metadata.h"
     29 
     30 namespace {
     31 struct call_data {
     32   // Receive closures are chained: we inject this closure as the
     33   // recv_initial_metadata_ready up-call on transport_stream_op, and remember to
     34   // call our next_recv_initial_metadata_ready member after handling it.
     35   grpc_closure recv_initial_metadata_ready;
     36   // Used by recv_initial_metadata_ready.
     37   grpc_metadata_batch* recv_initial_metadata;
     38   // Original recv_initial_metadata_ready callback, invoked after our own.
     39   grpc_closure* next_recv_initial_metadata_ready;
     40 
     41   // Marks whether the workaround is active
     42   bool workaround_active;
     43 };
     44 }  // namespace
     45 
     46 // Find the user agent metadata element in the batch
     47 static bool get_user_agent_mdelem(const grpc_metadata_batch* batch,
     48                                   grpc_mdelem* md) {
     49   if (batch->idx.named.user_agent != nullptr) {
     50     *md = batch->idx.named.user_agent->md;
     51     return true;
     52   }
     53   return false;
     54 }
     55 
     56 // Callback invoked when we receive an initial metadata.
     57 static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
     58   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
     59   call_data* calld = static_cast<call_data*>(elem->call_data);
     60 
     61   if (GRPC_ERROR_NONE == error) {
     62     grpc_mdelem md;
     63     if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) {
     64       grpc_workaround_user_agent_md* user_agent_md = grpc_parse_user_agent(md);
     65       if (user_agent_md
     66               ->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) {
     67         calld->workaround_active = true;
     68       }
     69     }
     70   }
     71 
     72   // Invoke the next callback.
     73   GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
     74                    GRPC_ERROR_REF(error));
     75 }
     76 
     77 // Start transport stream op.
     78 static void start_transport_stream_op_batch(
     79     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
     80   call_data* calld = static_cast<call_data*>(elem->call_data);
     81 
     82   // Inject callback for receiving initial metadata
     83   if (op->recv_initial_metadata) {
     84     calld->next_recv_initial_metadata_ready =
     85         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
     86     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
     87         &calld->recv_initial_metadata_ready;
     88     calld->recv_initial_metadata =
     89         op->payload->recv_initial_metadata.recv_initial_metadata;
     90   }
     91 
     92   if (op->send_message) {
     93     /* Send message happens after client's user-agent (initial metadata) is
     94      * received, so workaround_active must be set already */
     95     if (calld->workaround_active) {
     96       op->payload->send_message.send_message->set_flags(
     97           op->payload->send_message.send_message->flags() |
     98           GRPC_WRITE_NO_COMPRESS);
     99     }
    100   }
    101 
    102   // Chain to the next filter.
    103   grpc_call_next_op(elem, op);
    104 }
    105 
    106 // Constructor for call_data.
    107 static grpc_error* init_call_elem(grpc_call_element* elem,
    108                                   const grpc_call_element_args* args) {
    109   call_data* calld = static_cast<call_data*>(elem->call_data);
    110   calld->next_recv_initial_metadata_ready = nullptr;
    111   calld->workaround_active = false;
    112   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
    113                     recv_initial_metadata_ready, elem,
    114                     grpc_schedule_on_exec_ctx);
    115   return GRPC_ERROR_NONE;
    116 }
    117 
    118 // Destructor for call_data.
    119 static void destroy_call_elem(grpc_call_element* elem,
    120                               const grpc_call_final_info* final_info,
    121                               grpc_closure* ignored) {}
    122 
    123 // Constructor for channel_data.
    124 static grpc_error* init_channel_elem(grpc_channel_element* elem,
    125                                      grpc_channel_element_args* args) {
    126   return GRPC_ERROR_NONE;
    127 }
    128 
    129 // Destructor for channel_data.
    130 static void destroy_channel_elem(grpc_channel_element* elem) {}
    131 
    132 // Parse the user agent
    133 static bool parse_user_agent(grpc_mdelem md) {
    134   const char grpc_objc_specifier[] = "grpc-objc/";
    135   const size_t grpc_objc_specifier_len = sizeof(grpc_objc_specifier) - 1;
    136   const char cronet_specifier[] = "cronet_http";
    137   const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1;
    138 
    139   char* user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
    140   bool grpc_objc_specifier_seen = false;
    141   bool cronet_specifier_seen = false;
    142   char *major_version_str = user_agent_str, *minor_version_str;
    143   long major_version = 0, minor_version = 0;
    144 
    145   char* head = strtok(user_agent_str, " ");
    146   while (head != nullptr) {
    147     if (!grpc_objc_specifier_seen &&
    148         0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) {
    149       major_version_str = head + grpc_objc_specifier_len;
    150       grpc_objc_specifier_seen = true;
    151     } else if (grpc_objc_specifier_seen &&
    152                0 == strncmp(head, cronet_specifier, cronet_specifier_len)) {
    153       cronet_specifier_seen = true;
    154       break;
    155     }
    156 
    157     head = strtok(nullptr, " ");
    158   }
    159   if (grpc_objc_specifier_seen) {
    160     major_version_str = strtok(major_version_str, ".");
    161     minor_version_str = strtok(nullptr, ".");
    162     major_version = atol(major_version_str);
    163     minor_version = atol(minor_version_str);
    164   }
    165 
    166   gpr_free(user_agent_str);
    167   return (grpc_objc_specifier_seen && cronet_specifier_seen &&
    168           (major_version < 1 || (major_version == 1 && minor_version <= 3)));
    169 }
    170 
    171 const grpc_channel_filter grpc_workaround_cronet_compression_filter = {
    172     start_transport_stream_op_batch,
    173     grpc_channel_next_op,
    174     sizeof(call_data),
    175     init_call_elem,
    176     grpc_call_stack_ignore_set_pollset_or_pollset_set,
    177     destroy_call_elem,
    178     0,
    179     init_channel_elem,
    180     destroy_channel_elem,
    181     grpc_channel_next_get_info,
    182     "workaround_cronet_compression"};
    183 
    184 static bool register_workaround_cronet_compression(
    185     grpc_channel_stack_builder* builder, void* arg) {
    186   const grpc_channel_args* channel_args =
    187       grpc_channel_stack_builder_get_channel_arguments(builder);
    188   const grpc_arg* a = grpc_channel_args_find(
    189       channel_args, GRPC_ARG_WORKAROUND_CRONET_COMPRESSION);
    190   if (a == nullptr) {
    191     return true;
    192   }
    193   if (grpc_channel_arg_get_bool(a, false) == false) {
    194     return true;
    195   }
    196   return grpc_channel_stack_builder_prepend_filter(
    197       builder, &grpc_workaround_cronet_compression_filter, nullptr, nullptr);
    198 }
    199 
    200 void grpc_workaround_cronet_compression_filter_init(void) {
    201   grpc_channel_init_register_stage(
    202       GRPC_SERVER_CHANNEL, GRPC_WORKAROUND_PRIORITY_HIGH,
    203       register_workaround_cronet_compression, nullptr);
    204   grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION,
    205                            parse_user_agent);
    206 }
    207 
    208 void grpc_workaround_cronet_compression_filter_shutdown(void) {}
    209