Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
     22 
     23 #include <string.h>
     24 
     25 #include <grpc/support/alloc.h>
     26 #include <grpc/support/log.h>
     27 #include <grpc/support/string_util.h>
     28 #include "src/core/ext/transport/chttp2/transport/internal.h"
     29 #include "src/core/lib/gpr/string.h"
     30 #include "src/core/lib/gprpp/memory.h"
     31 #include "src/core/lib/slice/slice_internal.h"
     32 #include "src/core/lib/slice/slice_string_helpers.h"
     33 #include "src/core/lib/transport/transport.h"
     34 
     35 grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) {
     36   parser->state = GRPC_CHTTP2_DATA_FH_0;
     37   parser->parsing_frame = nullptr;
     38   return GRPC_ERROR_NONE;
     39 }
     40 
     41 void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) {
     42   if (parser->parsing_frame != nullptr) {
     43     GRPC_ERROR_UNREF(parser->parsing_frame->Finished(
     44         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
     45   }
     46   GRPC_ERROR_UNREF(parser->error);
     47 }
     48 
     49 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
     50                                                 uint8_t flags,
     51                                                 uint32_t stream_id,
     52                                                 grpc_chttp2_stream* s) {
     53   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
     54     char* msg;
     55     gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
     56     grpc_error* err = grpc_error_set_int(
     57         GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID,
     58         static_cast<intptr_t>(stream_id));
     59     gpr_free(msg);
     60     return err;
     61   }
     62 
     63   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
     64     s->received_last_frame = true;
     65   } else {
     66     s->received_last_frame = false;
     67   }
     68 
     69   return GRPC_ERROR_NONE;
     70 }
     71 
     72 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
     73                              uint32_t write_bytes, int is_eof,
     74                              grpc_transport_one_way_stats* stats,
     75                              grpc_slice_buffer* outbuf) {
     76   grpc_slice hdr;
     77   uint8_t* p;
     78   static const size_t header_size = 9;
     79 
     80   hdr = GRPC_SLICE_MALLOC(header_size);
     81   p = GRPC_SLICE_START_PTR(hdr);
     82   GPR_ASSERT(write_bytes < (1 << 24));
     83   *p++ = static_cast<uint8_t>(write_bytes >> 16);
     84   *p++ = static_cast<uint8_t>(write_bytes >> 8);
     85   *p++ = static_cast<uint8_t>(write_bytes);
     86   *p++ = GRPC_CHTTP2_FRAME_DATA;
     87   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
     88   *p++ = static_cast<uint8_t>(id >> 24);
     89   *p++ = static_cast<uint8_t>(id >> 16);
     90   *p++ = static_cast<uint8_t>(id >> 8);
     91   *p++ = static_cast<uint8_t>(id);
     92   grpc_slice_buffer_add(outbuf, hdr);
     93 
     94   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
     95 
     96   stats->framing_bytes += header_size;
     97   stats->data_bytes += write_bytes;
     98 }
     99 
    100 grpc_error* grpc_deframe_unprocessed_incoming_frames(
    101     grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
    102     grpc_slice_buffer* slices, grpc_slice* slice_out,
    103     grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
    104   grpc_error* error = GRPC_ERROR_NONE;
    105   grpc_chttp2_transport* t = s->t;
    106 
    107   while (slices->count > 0) {
    108     uint8_t* beg = nullptr;
    109     uint8_t* end = nullptr;
    110     uint8_t* cur = nullptr;
    111 
    112     grpc_slice slice = grpc_slice_buffer_take_first(slices);
    113 
    114     beg = GRPC_SLICE_START_PTR(slice);
    115     end = GRPC_SLICE_END_PTR(slice);
    116     cur = beg;
    117     uint32_t message_flags;
    118     char* msg;
    119 
    120     if (cur == end) {
    121       grpc_slice_unref_internal(slice);
    122       continue;
    123     }
    124 
    125     switch (p->state) {
    126       case GRPC_CHTTP2_DATA_ERROR:
    127         p->state = GRPC_CHTTP2_DATA_ERROR;
    128         grpc_slice_unref_internal(slice);
    129         return GRPC_ERROR_REF(p->error);
    130       case GRPC_CHTTP2_DATA_FH_0:
    131         s->stats.incoming.framing_bytes++;
    132         p->frame_type = *cur;
    133         switch (p->frame_type) {
    134           case 0:
    135             p->is_frame_compressed = false; /* GPR_FALSE */
    136             break;
    137           case 1:
    138             p->is_frame_compressed = true; /* GPR_TRUE */
    139             break;
    140           default:
    141             gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
    142             p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
    143             p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
    144                                           static_cast<intptr_t>(s->id));
    145             gpr_free(msg);
    146             msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
    147             p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
    148                                           grpc_slice_from_copied_string(msg));
    149             gpr_free(msg);
    150             p->error =
    151                 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
    152             p->state = GRPC_CHTTP2_DATA_ERROR;
    153             grpc_slice_unref_internal(slice);
    154             return GRPC_ERROR_REF(p->error);
    155         }
    156         if (++cur == end) {
    157           p->state = GRPC_CHTTP2_DATA_FH_1;
    158           grpc_slice_unref_internal(slice);
    159           continue;
    160         }
    161       /* fallthrough */
    162       case GRPC_CHTTP2_DATA_FH_1:
    163         s->stats.incoming.framing_bytes++;
    164         p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
    165         if (++cur == end) {
    166           p->state = GRPC_CHTTP2_DATA_FH_2;
    167           grpc_slice_unref_internal(slice);
    168           continue;
    169         }
    170       /* fallthrough */
    171       case GRPC_CHTTP2_DATA_FH_2:
    172         s->stats.incoming.framing_bytes++;
    173         p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
    174         if (++cur == end) {
    175           p->state = GRPC_CHTTP2_DATA_FH_3;
    176           grpc_slice_unref_internal(slice);
    177           continue;
    178         }
    179       /* fallthrough */
    180       case GRPC_CHTTP2_DATA_FH_3:
    181         s->stats.incoming.framing_bytes++;
    182         p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
    183         if (++cur == end) {
    184           p->state = GRPC_CHTTP2_DATA_FH_4;
    185           grpc_slice_unref_internal(slice);
    186           continue;
    187         }
    188       /* fallthrough */
    189       case GRPC_CHTTP2_DATA_FH_4:
    190         s->stats.incoming.framing_bytes++;
    191         GPR_ASSERT(stream_out != nullptr);
    192         GPR_ASSERT(p->parsing_frame == nullptr);
    193         p->frame_size |= (static_cast<uint32_t>(*cur));
    194         p->state = GRPC_CHTTP2_DATA_FRAME;
    195         ++cur;
    196         message_flags = 0;
    197         if (p->is_frame_compressed) {
    198           message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
    199         }
    200         p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
    201             t, s, p->frame_size, message_flags);
    202         stream_out->reset(p->parsing_frame);
    203         if (p->parsing_frame->remaining_bytes() == 0) {
    204           GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
    205           p->parsing_frame = nullptr;
    206           p->state = GRPC_CHTTP2_DATA_FH_0;
    207         }
    208         s->pending_byte_stream = true;
    209 
    210         if (cur != end) {
    211           grpc_slice_buffer_undo_take_first(
    212               slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
    213                                      static_cast<size_t>(end - beg)));
    214         }
    215         grpc_slice_unref_internal(slice);
    216         return GRPC_ERROR_NONE;
    217       case GRPC_CHTTP2_DATA_FRAME: {
    218         GPR_ASSERT(p->parsing_frame != nullptr);
    219         GPR_ASSERT(slice_out != nullptr);
    220         if (cur == end) {
    221           grpc_slice_unref_internal(slice);
    222           continue;
    223         }
    224         uint32_t remaining = static_cast<uint32_t>(end - cur);
    225         if (remaining == p->frame_size) {
    226           s->stats.incoming.data_bytes += remaining;
    227           if (GRPC_ERROR_NONE !=
    228               (error = p->parsing_frame->Push(
    229                    grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
    230                                   static_cast<size_t>(end - beg)),
    231                    slice_out))) {
    232             grpc_slice_unref_internal(slice);
    233             return error;
    234           }
    235           if (GRPC_ERROR_NONE !=
    236               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
    237             grpc_slice_unref_internal(slice);
    238             return error;
    239           }
    240           p->parsing_frame = nullptr;
    241           p->state = GRPC_CHTTP2_DATA_FH_0;
    242           grpc_slice_unref_internal(slice);
    243           return GRPC_ERROR_NONE;
    244         } else if (remaining < p->frame_size) {
    245           s->stats.incoming.data_bytes += remaining;
    246           if (GRPC_ERROR_NONE !=
    247               (error = p->parsing_frame->Push(
    248                    grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
    249                                   static_cast<size_t>(end - beg)),
    250                    slice_out))) {
    251             return error;
    252           }
    253           p->frame_size -= remaining;
    254           grpc_slice_unref_internal(slice);
    255           return GRPC_ERROR_NONE;
    256         } else {
    257           GPR_ASSERT(remaining > p->frame_size);
    258           s->stats.incoming.data_bytes += p->frame_size;
    259           if (GRPC_ERROR_NONE !=
    260               p->parsing_frame->Push(
    261                   grpc_slice_sub(
    262                       slice, static_cast<size_t>(cur - beg),
    263                       static_cast<size_t>(cur + p->frame_size - beg)),
    264                   slice_out)) {
    265             grpc_slice_unref_internal(slice);
    266             return error;
    267           }
    268           if (GRPC_ERROR_NONE !=
    269               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
    270             grpc_slice_unref_internal(slice);
    271             return error;
    272           }
    273           p->parsing_frame = nullptr;
    274           p->state = GRPC_CHTTP2_DATA_FH_0;
    275           cur += p->frame_size;
    276           grpc_slice_buffer_undo_take_first(
    277               slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
    278                                      static_cast<size_t>(end - beg)));
    279           grpc_slice_unref_internal(slice);
    280           return GRPC_ERROR_NONE;
    281         }
    282       }
    283     }
    284   }
    285 
    286   return GRPC_ERROR_NONE;
    287 }
    288 
    289 grpc_error* grpc_chttp2_data_parser_parse(void* parser,
    290                                           grpc_chttp2_transport* t,
    291                                           grpc_chttp2_stream* s,
    292                                           grpc_slice slice, int is_last) {
    293   if (!s->pending_byte_stream) {
    294     grpc_slice_ref_internal(slice);
    295     grpc_slice_buffer_add(&s->frame_storage, slice);
    296     grpc_chttp2_maybe_complete_recv_message(t, s);
    297   } else if (s->on_next) {
    298     GPR_ASSERT(s->frame_storage.length == 0);
    299     grpc_slice_ref_internal(slice);
    300     grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
    301     GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
    302     s->on_next = nullptr;
    303     s->unprocessed_incoming_frames_decompressed = false;
    304   } else {
    305     grpc_slice_ref_internal(slice);
    306     grpc_slice_buffer_add(&s->frame_storage, slice);
    307   }
    308 
    309   if (is_last && s->received_last_frame) {
    310     grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
    311   }
    312 
    313   return GRPC_ERROR_NONE;
    314 }
    315