1 // 2 // Copyright 2016 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #include <grpc/support/port_platform.h> 18 19 #include "src/core/ext/filters/message_size/message_size_filter.h" 20 21 #include <limits.h> 22 #include <string.h> 23 24 #include <grpc/impl/codegen/grpc_types.h> 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 #include <grpc/support/string_util.h> 28 29 #include "src/core/lib/channel/channel_args.h" 30 #include "src/core/lib/channel/channel_stack_builder.h" 31 #include "src/core/lib/gpr/string.h" 32 #include "src/core/lib/gprpp/ref_counted.h" 33 #include "src/core/lib/gprpp/ref_counted_ptr.h" 34 #include "src/core/lib/surface/channel_init.h" 35 #include "src/core/lib/transport/service_config.h" 36 37 typedef struct { 38 int max_send_size; 39 int max_recv_size; 40 } message_size_limits; 41 42 namespace grpc_core { 43 namespace { 44 45 class MessageSizeLimits : public RefCounted<MessageSizeLimits> { 46 public: 47 static RefCountedPtr<MessageSizeLimits> CreateFromJson(const grpc_json* json); 48 49 const message_size_limits& limits() const { return limits_; } 50 51 private: 52 // So New() can call our private ctor. 53 template <typename T, typename... Args> 54 friend T* grpc_core::New(Args&&... args); 55 56 MessageSizeLimits(int max_send_size, int max_recv_size) { 57 limits_.max_send_size = max_send_size; 58 limits_.max_recv_size = max_recv_size; 59 } 60 61 message_size_limits limits_; 62 }; 63 64 RefCountedPtr<MessageSizeLimits> MessageSizeLimits::CreateFromJson( 65 const grpc_json* json) { 66 int max_request_message_bytes = -1; 67 int max_response_message_bytes = -1; 68 for (grpc_json* field = json->child; field != nullptr; field = field->next) { 69 if (field->key == nullptr) continue; 70 if (strcmp(field->key, "maxRequestMessageBytes") == 0) { 71 if (max_request_message_bytes >= 0) return nullptr; // Duplicate. 72 if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) { 73 return nullptr; 74 } 75 max_request_message_bytes = gpr_parse_nonnegative_int(field->value); 76 if (max_request_message_bytes == -1) return nullptr; 77 } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) { 78 if (max_response_message_bytes >= 0) return nullptr; // Duplicate. 79 if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) { 80 return nullptr; 81 } 82 max_response_message_bytes = gpr_parse_nonnegative_int(field->value); 83 if (max_response_message_bytes == -1) return nullptr; 84 } 85 } 86 return MakeRefCounted<MessageSizeLimits>(max_request_message_bytes, 87 max_response_message_bytes); 88 } 89 90 } // namespace 91 } // namespace grpc_core 92 93 namespace { 94 95 struct call_data { 96 grpc_call_combiner* call_combiner; 97 message_size_limits limits; 98 // Receive closures are chained: we inject this closure as the 99 // recv_message_ready up-call on transport_stream_op, and remember to 100 // call our next_recv_message_ready member after handling it. 101 grpc_closure recv_message_ready; 102 grpc_closure recv_trailing_metadata_ready; 103 // The error caused by a message that is too large, or GRPC_ERROR_NONE 104 grpc_error* error; 105 // Used by recv_message_ready. 106 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; 107 // Original recv_message_ready callback, invoked after our own. 108 grpc_closure* next_recv_message_ready; 109 // Original recv_trailing_metadata callback, invoked after our own. 110 grpc_closure* original_recv_trailing_metadata_ready; 111 }; 112 113 struct channel_data { 114 message_size_limits limits; 115 // Maps path names to refcounted_message_size_limits structs. 116 grpc_core::RefCountedPtr<grpc_core::SliceHashTable< 117 grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits>>> 118 method_limit_table; 119 }; 120 121 } // namespace 122 123 // Callback invoked when we receive a message. Here we check the max 124 // receive message size. 125 static void recv_message_ready(void* user_data, grpc_error* error) { 126 grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); 127 call_data* calld = static_cast<call_data*>(elem->call_data); 128 if (*calld->recv_message != nullptr && calld->limits.max_recv_size >= 0 && 129 (*calld->recv_message)->length() > 130 static_cast<size_t>(calld->limits.max_recv_size)) { 131 char* message_string; 132 gpr_asprintf(&message_string, 133 "Received message larger than max (%u vs. %d)", 134 (*calld->recv_message)->length(), calld->limits.max_recv_size); 135 grpc_error* new_error = grpc_error_set_int( 136 GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), 137 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); 138 GRPC_ERROR_UNREF(calld->error); 139 if (error == GRPC_ERROR_NONE) { 140 error = new_error; 141 } else { 142 error = grpc_error_add_child(error, new_error); 143 } 144 calld->error = GRPC_ERROR_REF(error); 145 gpr_free(message_string); 146 } else { 147 GRPC_ERROR_REF(error); 148 } 149 // Invoke the next callback. 150 GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error); 151 } 152 153 // Callback invoked on completion of recv_trailing_metadata 154 // Notifies the recv_trailing_metadata batch of any message size failures 155 static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { 156 grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); 157 call_data* calld = static_cast<call_data*>(elem->call_data); 158 error = 159 grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); 160 // Invoke the next callback. 161 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); 162 } 163 164 // Start transport stream op. 165 static void start_transport_stream_op_batch( 166 grpc_call_element* elem, grpc_transport_stream_op_batch* op) { 167 call_data* calld = static_cast<call_data*>(elem->call_data); 168 // Check max send message size. 169 if (op->send_message && calld->limits.max_send_size >= 0 && 170 op->payload->send_message.send_message->length() > 171 static_cast<size_t>(calld->limits.max_send_size)) { 172 char* message_string; 173 gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", 174 op->payload->send_message.send_message->length(), 175 calld->limits.max_send_size); 176 grpc_transport_stream_op_batch_finish_with_failure( 177 op, 178 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), 179 GRPC_ERROR_INT_GRPC_STATUS, 180 GRPC_STATUS_RESOURCE_EXHAUSTED), 181 calld->call_combiner); 182 gpr_free(message_string); 183 return; 184 } 185 // Inject callback for receiving a message. 186 if (op->recv_message) { 187 calld->next_recv_message_ready = 188 op->payload->recv_message.recv_message_ready; 189 calld->recv_message = op->payload->recv_message.recv_message; 190 op->payload->recv_message.recv_message_ready = &calld->recv_message_ready; 191 } 192 // Inject callback for receiving trailing metadata. 193 if (op->recv_trailing_metadata) { 194 calld->original_recv_trailing_metadata_ready = 195 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; 196 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = 197 &calld->recv_trailing_metadata_ready; 198 } 199 // Chain to the next filter. 200 grpc_call_next_op(elem, op); 201 } 202 203 // Constructor for call_data. 204 static grpc_error* init_call_elem(grpc_call_element* elem, 205 const grpc_call_element_args* args) { 206 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 207 call_data* calld = static_cast<call_data*>(elem->call_data); 208 calld->call_combiner = args->call_combiner; 209 calld->next_recv_message_ready = nullptr; 210 calld->original_recv_trailing_metadata_ready = nullptr; 211 calld->error = GRPC_ERROR_NONE; 212 GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, 213 grpc_schedule_on_exec_ctx); 214 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, 215 recv_trailing_metadata_ready, elem, 216 grpc_schedule_on_exec_ctx); 217 // Get max sizes from channel data, then merge in per-method config values. 218 // Note: Per-method config is only available on the client, so we 219 // apply the max request size to the send limit and the max response 220 // size to the receive limit. 221 calld->limits = chand->limits; 222 if (chand->method_limit_table != nullptr) { 223 grpc_core::RefCountedPtr<grpc_core::MessageSizeLimits> limits = 224 grpc_core::ServiceConfig::MethodConfigTableLookup( 225 *chand->method_limit_table, args->path); 226 if (limits != nullptr) { 227 if (limits->limits().max_send_size >= 0 && 228 (limits->limits().max_send_size < calld->limits.max_send_size || 229 calld->limits.max_send_size < 0)) { 230 calld->limits.max_send_size = limits->limits().max_send_size; 231 } 232 if (limits->limits().max_recv_size >= 0 && 233 (limits->limits().max_recv_size < calld->limits.max_recv_size || 234 calld->limits.max_recv_size < 0)) { 235 calld->limits.max_recv_size = limits->limits().max_recv_size; 236 } 237 } 238 } 239 return GRPC_ERROR_NONE; 240 } 241 242 // Destructor for call_data. 243 static void destroy_call_elem(grpc_call_element* elem, 244 const grpc_call_final_info* final_info, 245 grpc_closure* ignored) { 246 call_data* calld = (call_data*)elem->call_data; 247 GRPC_ERROR_UNREF(calld->error); 248 } 249 250 static int default_size(const grpc_channel_args* args, 251 int without_minimal_stack) { 252 if (grpc_channel_args_want_minimal_stack(args)) { 253 return -1; 254 } 255 return without_minimal_stack; 256 } 257 258 message_size_limits get_message_size_limits( 259 const grpc_channel_args* channel_args) { 260 message_size_limits lim; 261 lim.max_send_size = 262 default_size(channel_args, GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH); 263 lim.max_recv_size = 264 default_size(channel_args, GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH); 265 for (size_t i = 0; i < channel_args->num_args; ++i) { 266 if (strcmp(channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 267 0) { 268 const grpc_integer_options options = {lim.max_send_size, -1, INT_MAX}; 269 lim.max_send_size = 270 grpc_channel_arg_get_integer(&channel_args->args[i], options); 271 } 272 if (strcmp(channel_args->args[i].key, 273 GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { 274 const grpc_integer_options options = {lim.max_recv_size, -1, INT_MAX}; 275 lim.max_recv_size = 276 grpc_channel_arg_get_integer(&channel_args->args[i], options); 277 } 278 } 279 return lim; 280 } 281 282 // Constructor for channel_data. 283 static grpc_error* init_channel_elem(grpc_channel_element* elem, 284 grpc_channel_element_args* args) { 285 GPR_ASSERT(!args->is_last); 286 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 287 chand->limits = get_message_size_limits(args->channel_args); 288 // Get method config table from channel args. 289 const grpc_arg* channel_arg = 290 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); 291 const char* service_config_str = grpc_channel_arg_get_string(channel_arg); 292 if (service_config_str != nullptr) { 293 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = 294 grpc_core::ServiceConfig::Create(service_config_str); 295 if (service_config != nullptr) { 296 chand->method_limit_table = service_config->CreateMethodConfigTable( 297 grpc_core::MessageSizeLimits::CreateFromJson); 298 } 299 } 300 return GRPC_ERROR_NONE; 301 } 302 303 // Destructor for channel_data. 304 static void destroy_channel_elem(grpc_channel_element* elem) { 305 channel_data* chand = static_cast<channel_data*>(elem->channel_data); 306 chand->method_limit_table.reset(); 307 } 308 309 const grpc_channel_filter grpc_message_size_filter = { 310 start_transport_stream_op_batch, 311 grpc_channel_next_op, 312 sizeof(call_data), 313 init_call_elem, 314 grpc_call_stack_ignore_set_pollset_or_pollset_set, 315 destroy_call_elem, 316 sizeof(channel_data), 317 init_channel_elem, 318 destroy_channel_elem, 319 grpc_channel_next_get_info, 320 "message_size"}; 321 322 static bool maybe_add_message_size_filter(grpc_channel_stack_builder* builder, 323 void* arg) { 324 const grpc_channel_args* channel_args = 325 grpc_channel_stack_builder_get_channel_arguments(builder); 326 bool enable = false; 327 message_size_limits lim = get_message_size_limits(channel_args); 328 if (lim.max_send_size != -1 || lim.max_recv_size != -1) { 329 enable = true; 330 } 331 const grpc_arg* a = 332 grpc_channel_args_find(channel_args, GRPC_ARG_SERVICE_CONFIG); 333 if (a != nullptr) { 334 enable = true; 335 } 336 if (enable) { 337 return grpc_channel_stack_builder_prepend_filter( 338 builder, &grpc_message_size_filter, nullptr, nullptr); 339 } else { 340 return true; 341 } 342 } 343 344 void grpc_message_size_filter_init(void) { 345 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, 346 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, 347 maybe_add_message_size_filter, nullptr); 348 grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, 349 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, 350 maybe_add_message_size_filter, nullptr); 351 grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, 352 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, 353 maybe_add_message_size_filter, nullptr); 354 } 355 356 void grpc_message_size_filter_shutdown(void) {} 357