1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/ext/transport/chttp2/transport/frame_data.h" 22 23 #include <string.h> 24 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 #include <grpc/support/string_util.h> 28 #include "src/core/ext/transport/chttp2/transport/internal.h" 29 #include "src/core/lib/gpr/string.h" 30 #include "src/core/lib/gprpp/memory.h" 31 #include "src/core/lib/slice/slice_internal.h" 32 #include "src/core/lib/slice/slice_string_helpers.h" 33 #include "src/core/lib/transport/transport.h" 34 35 grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) { 36 parser->state = GRPC_CHTTP2_DATA_FH_0; 37 parser->parsing_frame = nullptr; 38 return GRPC_ERROR_NONE; 39 } 40 41 void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) { 42 if (parser->parsing_frame != nullptr) { 43 GRPC_ERROR_UNREF(parser->parsing_frame->Finished( 44 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); 45 } 46 GRPC_ERROR_UNREF(parser->error); 47 } 48 49 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, 50 uint8_t flags, 51 uint32_t stream_id, 52 grpc_chttp2_stream* s) { 53 if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) { 54 char* msg; 55 gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags); 56 grpc_error* err = grpc_error_set_int( 57 GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID, 58 static_cast<intptr_t>(stream_id)); 59 gpr_free(msg); 60 return err; 61 } 62 63 if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { 64 s->received_last_frame = true; 65 } else { 66 s->received_last_frame = false; 67 } 68 69 return GRPC_ERROR_NONE; 70 } 71 72 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, 73 uint32_t write_bytes, int is_eof, 74 grpc_transport_one_way_stats* stats, 75 grpc_slice_buffer* outbuf) { 76 grpc_slice hdr; 77 uint8_t* p; 78 static const size_t header_size = 9; 79 80 hdr = GRPC_SLICE_MALLOC(header_size); 81 p = GRPC_SLICE_START_PTR(hdr); 82 GPR_ASSERT(write_bytes < (1 << 24)); 83 *p++ = static_cast<uint8_t>(write_bytes >> 16); 84 *p++ = static_cast<uint8_t>(write_bytes >> 8); 85 *p++ = static_cast<uint8_t>(write_bytes); 86 *p++ = GRPC_CHTTP2_FRAME_DATA; 87 *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0; 88 *p++ = static_cast<uint8_t>(id >> 24); 89 *p++ = static_cast<uint8_t>(id >> 16); 90 *p++ = static_cast<uint8_t>(id >> 8); 91 *p++ = static_cast<uint8_t>(id); 92 grpc_slice_buffer_add(outbuf, hdr); 93 94 grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf); 95 96 stats->framing_bytes += header_size; 97 stats->data_bytes += write_bytes; 98 } 99 100 grpc_error* grpc_deframe_unprocessed_incoming_frames( 101 grpc_chttp2_data_parser* p, grpc_chttp2_stream* s, 102 grpc_slice_buffer* slices, grpc_slice* slice_out, 103 grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) { 104 grpc_error* error = GRPC_ERROR_NONE; 105 grpc_chttp2_transport* t = s->t; 106 107 while (slices->count > 0) { 108 uint8_t* beg = nullptr; 109 uint8_t* end = nullptr; 110 uint8_t* cur = nullptr; 111 112 grpc_slice slice = grpc_slice_buffer_take_first(slices); 113 114 beg = GRPC_SLICE_START_PTR(slice); 115 end = GRPC_SLICE_END_PTR(slice); 116 cur = beg; 117 uint32_t message_flags; 118 char* msg; 119 120 if (cur == end) { 121 grpc_slice_unref_internal(slice); 122 continue; 123 } 124 125 switch (p->state) { 126 case GRPC_CHTTP2_DATA_ERROR: 127 p->state = GRPC_CHTTP2_DATA_ERROR; 128 grpc_slice_unref_internal(slice); 129 return GRPC_ERROR_REF(p->error); 130 case GRPC_CHTTP2_DATA_FH_0: 131 s->stats.incoming.framing_bytes++; 132 p->frame_type = *cur; 133 switch (p->frame_type) { 134 case 0: 135 p->is_frame_compressed = false; /* GPR_FALSE */ 136 break; 137 case 1: 138 p->is_frame_compressed = true; /* GPR_TRUE */ 139 break; 140 default: 141 gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); 142 p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); 143 p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, 144 static_cast<intptr_t>(s->id)); 145 gpr_free(msg); 146 msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); 147 p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, 148 grpc_slice_from_copied_string(msg)); 149 gpr_free(msg); 150 p->error = 151 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); 152 p->state = GRPC_CHTTP2_DATA_ERROR; 153 grpc_slice_unref_internal(slice); 154 return GRPC_ERROR_REF(p->error); 155 } 156 if (++cur == end) { 157 p->state = GRPC_CHTTP2_DATA_FH_1; 158 grpc_slice_unref_internal(slice); 159 continue; 160 } 161 /* fallthrough */ 162 case GRPC_CHTTP2_DATA_FH_1: 163 s->stats.incoming.framing_bytes++; 164 p->frame_size = (static_cast<uint32_t>(*cur)) << 24; 165 if (++cur == end) { 166 p->state = GRPC_CHTTP2_DATA_FH_2; 167 grpc_slice_unref_internal(slice); 168 continue; 169 } 170 /* fallthrough */ 171 case GRPC_CHTTP2_DATA_FH_2: 172 s->stats.incoming.framing_bytes++; 173 p->frame_size |= (static_cast<uint32_t>(*cur)) << 16; 174 if (++cur == end) { 175 p->state = GRPC_CHTTP2_DATA_FH_3; 176 grpc_slice_unref_internal(slice); 177 continue; 178 } 179 /* fallthrough */ 180 case GRPC_CHTTP2_DATA_FH_3: 181 s->stats.incoming.framing_bytes++; 182 p->frame_size |= (static_cast<uint32_t>(*cur)) << 8; 183 if (++cur == end) { 184 p->state = GRPC_CHTTP2_DATA_FH_4; 185 grpc_slice_unref_internal(slice); 186 continue; 187 } 188 /* fallthrough */ 189 case GRPC_CHTTP2_DATA_FH_4: 190 s->stats.incoming.framing_bytes++; 191 GPR_ASSERT(stream_out != nullptr); 192 GPR_ASSERT(p->parsing_frame == nullptr); 193 p->frame_size |= (static_cast<uint32_t>(*cur)); 194 p->state = GRPC_CHTTP2_DATA_FRAME; 195 ++cur; 196 message_flags = 0; 197 if (p->is_frame_compressed) { 198 message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; 199 } 200 p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>( 201 t, s, p->frame_size, message_flags); 202 stream_out->reset(p->parsing_frame); 203 if (p->parsing_frame->remaining_bytes() == 0) { 204 GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true)); 205 p->parsing_frame = nullptr; 206 p->state = GRPC_CHTTP2_DATA_FH_0; 207 } 208 s->pending_byte_stream = true; 209 210 if (cur != end) { 211 grpc_slice_buffer_undo_take_first( 212 slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg), 213 static_cast<size_t>(end - beg))); 214 } 215 grpc_slice_unref_internal(slice); 216 return GRPC_ERROR_NONE; 217 case GRPC_CHTTP2_DATA_FRAME: { 218 GPR_ASSERT(p->parsing_frame != nullptr); 219 GPR_ASSERT(slice_out != nullptr); 220 if (cur == end) { 221 grpc_slice_unref_internal(slice); 222 continue; 223 } 224 uint32_t remaining = static_cast<uint32_t>(end - cur); 225 if (remaining == p->frame_size) { 226 s->stats.incoming.data_bytes += remaining; 227 if (GRPC_ERROR_NONE != 228 (error = p->parsing_frame->Push( 229 grpc_slice_sub(slice, static_cast<size_t>(cur - beg), 230 static_cast<size_t>(end - beg)), 231 slice_out))) { 232 grpc_slice_unref_internal(slice); 233 return error; 234 } 235 if (GRPC_ERROR_NONE != 236 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { 237 grpc_slice_unref_internal(slice); 238 return error; 239 } 240 p->parsing_frame = nullptr; 241 p->state = GRPC_CHTTP2_DATA_FH_0; 242 grpc_slice_unref_internal(slice); 243 return GRPC_ERROR_NONE; 244 } else if (remaining < p->frame_size) { 245 s->stats.incoming.data_bytes += remaining; 246 if (GRPC_ERROR_NONE != 247 (error = p->parsing_frame->Push( 248 grpc_slice_sub(slice, static_cast<size_t>(cur - beg), 249 static_cast<size_t>(end - beg)), 250 slice_out))) { 251 return error; 252 } 253 p->frame_size -= remaining; 254 grpc_slice_unref_internal(slice); 255 return GRPC_ERROR_NONE; 256 } else { 257 GPR_ASSERT(remaining > p->frame_size); 258 s->stats.incoming.data_bytes += p->frame_size; 259 if (GRPC_ERROR_NONE != 260 p->parsing_frame->Push( 261 grpc_slice_sub( 262 slice, static_cast<size_t>(cur - beg), 263 static_cast<size_t>(cur + p->frame_size - beg)), 264 slice_out)) { 265 grpc_slice_unref_internal(slice); 266 return error; 267 } 268 if (GRPC_ERROR_NONE != 269 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { 270 grpc_slice_unref_internal(slice); 271 return error; 272 } 273 p->parsing_frame = nullptr; 274 p->state = GRPC_CHTTP2_DATA_FH_0; 275 cur += p->frame_size; 276 grpc_slice_buffer_undo_take_first( 277 slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg), 278 static_cast<size_t>(end - beg))); 279 grpc_slice_unref_internal(slice); 280 return GRPC_ERROR_NONE; 281 } 282 } 283 } 284 } 285 286 return GRPC_ERROR_NONE; 287 } 288 289 grpc_error* grpc_chttp2_data_parser_parse(void* parser, 290 grpc_chttp2_transport* t, 291 grpc_chttp2_stream* s, 292 grpc_slice slice, int is_last) { 293 if (!s->pending_byte_stream) { 294 grpc_slice_ref_internal(slice); 295 grpc_slice_buffer_add(&s->frame_storage, slice); 296 grpc_chttp2_maybe_complete_recv_message(t, s); 297 } else if (s->on_next) { 298 GPR_ASSERT(s->frame_storage.length == 0); 299 grpc_slice_ref_internal(slice); 300 grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); 301 GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE); 302 s->on_next = nullptr; 303 s->unprocessed_incoming_frames_decompressed = false; 304 } else { 305 grpc_slice_ref_internal(slice); 306 grpc_slice_buffer_add(&s->frame_storage, slice); 307 } 308 309 if (is_last && s->received_last_frame) { 310 grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE); 311 } 312 313 return GRPC_ERROR_NONE; 314 } 315