Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
     20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
     21 
     22 #include <grpc/support/port_platform.h>
     23 
     24 #include <assert.h>
     25 #include <stdbool.h>
     26 
     27 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
     28 #include "src/core/ext/transport/chttp2/transport/frame.h"
     29 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
     30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
     31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
     32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
     33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
     34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
     35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
     36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
     37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
     38 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
     39 #include "src/core/lib/compression/stream_compression.h"
     40 #include "src/core/lib/gprpp/manual_constructor.h"
     41 #include "src/core/lib/iomgr/combiner.h"
     42 #include "src/core/lib/iomgr/endpoint.h"
     43 #include "src/core/lib/iomgr/timer.h"
     44 #include "src/core/lib/transport/connectivity_state.h"
     45 #include "src/core/lib/transport/transport_impl.h"
     46 
     47 /* streams are kept in various linked lists depending on what things need to
     48    happen to them... this enum labels each list */
     49 typedef enum {
     50   GRPC_CHTTP2_LIST_WRITABLE,
     51   GRPC_CHTTP2_LIST_WRITING,
     52   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
     53   GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
     54   /** streams that are waiting to start because there are too many concurrent
     55       streams on the connection */
     56   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
     57   STREAM_LIST_COUNT /* must be last */
     58 } grpc_chttp2_stream_list_id;
     59 
     60 typedef enum {
     61   GRPC_CHTTP2_WRITE_STATE_IDLE,
     62   GRPC_CHTTP2_WRITE_STATE_WRITING,
     63   GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
     64 } grpc_chttp2_write_state;
     65 
     66 typedef enum {
     67   GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
     68   GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
     69 } grpc_chttp2_optimization_target;
     70 
     71 typedef enum {
     72   GRPC_CHTTP2_PCL_INITIATE = 0,
     73   GRPC_CHTTP2_PCL_NEXT,
     74   GRPC_CHTTP2_PCL_INFLIGHT,
     75   GRPC_CHTTP2_PCL_COUNT /* must be last */
     76 } grpc_chttp2_ping_closure_list;
     77 
     78 typedef enum {
     79   GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
     80   GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
     81   GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
     82   GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
     83   GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
     84   GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
     85   GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
     86   GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
     87   GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
     88   GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
     89   GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
     90   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
     91   GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
     92   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
     93   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
     94   GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
     95   GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
     96   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
     97   GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
     98   GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
     99 } grpc_chttp2_initiate_write_reason;
    100 
    101 const char* grpc_chttp2_initiate_write_reason_string(
    102     grpc_chttp2_initiate_write_reason reason);
    103 
    104 typedef struct {
    105   grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
    106   uint64_t inflight_id;
    107 } grpc_chttp2_ping_queue;
    108 
    109 typedef struct {
    110   int max_pings_without_data;
    111   int max_ping_strikes;
    112   grpc_millis min_sent_ping_interval_without_data;
    113   grpc_millis min_recv_ping_interval_without_data;
    114 } grpc_chttp2_repeated_ping_policy;
    115 
    116 typedef struct {
    117   grpc_millis last_ping_sent_time;
    118   int pings_before_data_required;
    119   grpc_timer delayed_ping_timer;
    120   bool is_delayed_ping_timer_set;
    121 } grpc_chttp2_repeated_ping_state;
    122 
    123 typedef struct {
    124   grpc_millis last_ping_recv_time;
    125   int ping_strikes;
    126 } grpc_chttp2_server_ping_recv_state;
    127 
    128 /* deframer state for the overall http2 stream of bytes */
    129 typedef enum {
    130   /* prefix: one entry per http2 connection prefix byte */
    131   GRPC_DTS_CLIENT_PREFIX_0 = 0,
    132   GRPC_DTS_CLIENT_PREFIX_1,
    133   GRPC_DTS_CLIENT_PREFIX_2,
    134   GRPC_DTS_CLIENT_PREFIX_3,
    135   GRPC_DTS_CLIENT_PREFIX_4,
    136   GRPC_DTS_CLIENT_PREFIX_5,
    137   GRPC_DTS_CLIENT_PREFIX_6,
    138   GRPC_DTS_CLIENT_PREFIX_7,
    139   GRPC_DTS_CLIENT_PREFIX_8,
    140   GRPC_DTS_CLIENT_PREFIX_9,
    141   GRPC_DTS_CLIENT_PREFIX_10,
    142   GRPC_DTS_CLIENT_PREFIX_11,
    143   GRPC_DTS_CLIENT_PREFIX_12,
    144   GRPC_DTS_CLIENT_PREFIX_13,
    145   GRPC_DTS_CLIENT_PREFIX_14,
    146   GRPC_DTS_CLIENT_PREFIX_15,
    147   GRPC_DTS_CLIENT_PREFIX_16,
    148   GRPC_DTS_CLIENT_PREFIX_17,
    149   GRPC_DTS_CLIENT_PREFIX_18,
    150   GRPC_DTS_CLIENT_PREFIX_19,
    151   GRPC_DTS_CLIENT_PREFIX_20,
    152   GRPC_DTS_CLIENT_PREFIX_21,
    153   GRPC_DTS_CLIENT_PREFIX_22,
    154   GRPC_DTS_CLIENT_PREFIX_23,
    155   /* frame header byte 0... */
    156   /* must follow from the prefix states */
    157   GRPC_DTS_FH_0,
    158   GRPC_DTS_FH_1,
    159   GRPC_DTS_FH_2,
    160   GRPC_DTS_FH_3,
    161   GRPC_DTS_FH_4,
    162   GRPC_DTS_FH_5,
    163   GRPC_DTS_FH_6,
    164   GRPC_DTS_FH_7,
    165   /* ... frame header byte 8 */
    166   GRPC_DTS_FH_8,
    167   /* inside a http2 frame */
    168   GRPC_DTS_FRAME
    169 } grpc_chttp2_deframe_transport_state;
    170 
    171 typedef struct {
    172   grpc_chttp2_stream* head;
    173   grpc_chttp2_stream* tail;
    174 } grpc_chttp2_stream_list;
    175 
    176 typedef struct {
    177   grpc_chttp2_stream* next;
    178   grpc_chttp2_stream* prev;
    179 } grpc_chttp2_stream_link;
    180 
    181 /* We keep several sets of connection wide parameters */
    182 typedef enum {
    183   /* The settings our peer has asked for (and we have acked) */
    184   GRPC_PEER_SETTINGS = 0,
    185   /* The settings we'd like to have */
    186   GRPC_LOCAL_SETTINGS,
    187   /* The settings we've published to our peer */
    188   GRPC_SENT_SETTINGS,
    189   /* The settings the peer has acked */
    190   GRPC_ACKED_SETTINGS,
    191   GRPC_NUM_SETTING_SETS
    192 } grpc_chttp2_setting_set;
    193 
    194 typedef enum {
    195   GRPC_CHTTP2_NO_GOAWAY_SEND,
    196   GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
    197   GRPC_CHTTP2_GOAWAY_SENT,
    198 } grpc_chttp2_sent_goaway_state;
    199 
    200 typedef struct grpc_chttp2_write_cb {
    201   int64_t call_at_byte;
    202   grpc_closure* closure;
    203   struct grpc_chttp2_write_cb* next;
    204 } grpc_chttp2_write_cb;
    205 
    206 namespace grpc_core {
    207 
    208 class Chttp2IncomingByteStream : public ByteStream {
    209  public:
    210   Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
    211                            grpc_chttp2_stream* stream, uint32_t frame_size,
    212                            uint32_t flags);
    213 
    214   void Orphan() override;
    215 
    216   bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
    217   grpc_error* Pull(grpc_slice* slice) override;
    218   void Shutdown(grpc_error* error) override;
    219 
    220   // TODO(roth): When I converted this class to C++, I wanted to make it
    221   // inherit from RefCounted or InternallyRefCounted instead of continuing
    222   // to use its own custom ref-counting code.  However, that would require
    223   // using multiple inheritence, which sucks in general.  And to make matters
    224   // worse, it causes problems with our New<> and Delete<> wrappers.
    225   // Specifically, unless RefCounted is first in the list of parent classes,
    226   // it will see a different value of the address of the object than the one
    227   // we actually allocated, in which case gpr_free() will be called on a
    228   // different address than the one we got from gpr_malloc(), thus causing a
    229   // crash.  Given the fragility of depending on that, as well as a desire to
    230   // avoid multiple inheritence in general, I've decided to leave this
    231   // alone for now.  We can revisit this once we're able to link against
    232   // libc++, at which point we can eliminate New<> and Delete<> and
    233   // switch to std::shared_ptr<>.
    234   void Ref();
    235   void Unref();
    236 
    237   void PublishError(grpc_error* error);
    238 
    239   grpc_error* Push(grpc_slice slice, grpc_slice* slice_out);
    240 
    241   grpc_error* Finished(grpc_error* error, bool reset_on_error);
    242 
    243   uint32_t remaining_bytes() const { return remaining_bytes_; }
    244 
    245  private:
    246   static void NextLocked(void* arg, grpc_error* error_ignored);
    247   static void OrphanLocked(void* arg, grpc_error* error_ignored);
    248 
    249   void MaybeCreateStreamDecompressionCtx();
    250 
    251   grpc_chttp2_transport* transport_;  // Immutable.
    252   grpc_chttp2_stream* stream_;        // Immutable.
    253 
    254   gpr_refcount refs_;
    255 
    256   /* Accessed only by transport thread when stream->pending_byte_stream == false
    257    * Accessed only by application thread when stream->pending_byte_stream ==
    258    * true */
    259   uint32_t remaining_bytes_;
    260 
    261   /* Accessed only by transport thread when stream->pending_byte_stream == false
    262    * Accessed only by application thread when stream->pending_byte_stream ==
    263    * true */
    264   struct {
    265     grpc_closure closure;
    266     size_t max_size_hint;
    267     grpc_closure* on_complete;
    268   } next_action_;
    269   grpc_closure destroy_action_;
    270 };
    271 
    272 }  // namespace grpc_core
    273 
    274 typedef enum {
    275   GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
    276   GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
    277   GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
    278   GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
    279 } grpc_chttp2_keepalive_state;
    280 
    281 struct grpc_chttp2_transport {
    282   grpc_transport base; /* must be first */
    283   gpr_refcount refs;
    284   grpc_endpoint* ep;
    285   char* peer_string;
    286 
    287   grpc_combiner* combiner;
    288 
    289   grpc_closure* notify_on_receive_settings;
    290 
    291   /** write execution state of the transport */
    292   grpc_chttp2_write_state write_state;
    293   /** is this the first write in a series of writes?
    294       set when we initiate writing from idle, cleared when we
    295       initiate writing from writing+more */
    296   bool is_first_write_in_batch;
    297 
    298   /** is the transport destroying itself? */
    299   uint8_t destroying;
    300   /** has the upper layer closed the transport? */
    301   grpc_error* closed_with_error;
    302 
    303   /** is there a read request to the endpoint outstanding? */
    304   uint8_t endpoint_reading;
    305 
    306   grpc_chttp2_optimization_target opt_target;
    307 
    308   /** various lists of streams */
    309   grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
    310 
    311   /** maps stream id to grpc_chttp2_stream objects */
    312   grpc_chttp2_stream_map stream_map;
    313 
    314   grpc_closure write_action_begin_locked;
    315   grpc_closure write_action;
    316   grpc_closure write_action_end_locked;
    317 
    318   grpc_closure read_action_locked;
    319 
    320   /** incoming read bytes */
    321   grpc_slice_buffer read_buffer;
    322 
    323   /** address to place a newly accepted stream - set and unset by
    324       grpc_chttp2_parsing_accept_stream; used by init_stream to
    325       publish the accepted server stream */
    326   grpc_chttp2_stream** accepting_stream;
    327 
    328   struct {
    329     /* accept stream callback */
    330     void (*accept_stream)(void* user_data, grpc_transport* transport,
    331                           const void* server_data);
    332     void* accept_stream_user_data;
    333 
    334     /** connectivity tracking */
    335     grpc_connectivity_state_tracker state_tracker;
    336   } channel_callback;
    337 
    338   /** data to write now */
    339   grpc_slice_buffer outbuf;
    340   /** hpack encoding */
    341   grpc_chttp2_hpack_compressor hpack_compressor;
    342   /** is this a client? */
    343   bool is_client;
    344 
    345   /** data to write next write */
    346   grpc_slice_buffer qbuf;
    347 
    348   /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
    349    */
    350   uint32_t write_buffer_size;
    351 
    352   /** Set to a grpc_error object if a goaway frame is received. By default, set
    353    * to GRPC_ERROR_NONE */
    354   grpc_error* goaway_error;
    355 
    356   grpc_chttp2_sent_goaway_state sent_goaway_state;
    357 
    358   /** are the local settings dirty and need to be sent? */
    359   bool dirtied_local_settings;
    360   /** have local settings been sent? */
    361   bool sent_local_settings;
    362   /** bitmask of setting indexes to send out */
    363   uint32_t force_send_settings;
    364   /** settings values */
    365   uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
    366 
    367   /** what is the next stream id to be allocated by this peer?
    368       copied to next_stream_id in parsing when parsing commences */
    369   uint32_t next_stream_id;
    370 
    371   /** last new stream id */
    372   uint32_t last_new_stream_id;
    373 
    374   /** ping queues for various ping insertion points */
    375   grpc_chttp2_ping_queue ping_queue;
    376   grpc_chttp2_repeated_ping_policy ping_policy;
    377   grpc_chttp2_repeated_ping_state ping_state;
    378   uint64_t ping_ctr; /* unique id for pings */
    379   grpc_closure retry_initiate_ping_locked;
    380 
    381   /** ping acks */
    382   size_t ping_ack_count;
    383   size_t ping_ack_capacity;
    384   uint64_t* ping_acks;
    385   grpc_chttp2_server_ping_recv_state ping_recv_state;
    386 
    387   /** parser for headers */
    388   grpc_chttp2_hpack_parser hpack_parser;
    389   /** simple one shot parsers */
    390   union {
    391     grpc_chttp2_window_update_parser window_update;
    392     grpc_chttp2_settings_parser settings;
    393     grpc_chttp2_ping_parser ping;
    394     grpc_chttp2_rst_stream_parser rst_stream;
    395   } simple;
    396   /** parser for goaway frames */
    397   grpc_chttp2_goaway_parser goaway_parser;
    398 
    399   grpc_core::PolymorphicManualConstructor<
    400       grpc_core::chttp2::TransportFlowControlBase,
    401       grpc_core::chttp2::TransportFlowControl,
    402       grpc_core::chttp2::TransportFlowControlDisabled>
    403       flow_control;
    404   /** initial window change. This is tracked as we parse settings frames from
    405    * the remote peer. If there is a positive delta, then we will make all
    406    * streams readable since they may have become unstalled */
    407   int64_t initial_window_update = 0;
    408 
    409   /* deframing */
    410   grpc_chttp2_deframe_transport_state deframe_state;
    411   uint8_t incoming_frame_type;
    412   uint8_t incoming_frame_flags;
    413   uint8_t header_eof;
    414   bool is_first_frame;
    415   uint32_t expect_continuation_stream_id;
    416   uint32_t incoming_frame_size;
    417   uint32_t incoming_stream_id;
    418 
    419   /* active parser */
    420   void* parser_data;
    421   grpc_chttp2_stream* incoming_stream;
    422   grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t,
    423                         grpc_chttp2_stream* s, grpc_slice slice, int is_last);
    424 
    425   grpc_chttp2_write_cb* write_cb_pool;
    426 
    427   /* bdp estimator */
    428   grpc_closure next_bdp_ping_timer_expired_locked;
    429   grpc_closure start_bdp_ping_locked;
    430   grpc_closure finish_bdp_ping_locked;
    431 
    432   /* if non-NULL, close the transport with this error when writes are finished
    433    */
    434   grpc_error* close_transport_on_writes_finished;
    435 
    436   /* a list of closures to run after writes are finished */
    437   grpc_closure_list run_after_write;
    438 
    439   /* buffer pool state */
    440   /** have we scheduled a benign cleanup? */
    441   bool benign_reclaimer_registered;
    442   /** have we scheduled a destructive cleanup? */
    443   bool destructive_reclaimer_registered;
    444   /** benign cleanup closure */
    445   grpc_closure benign_reclaimer_locked;
    446   /** destructive cleanup closure */
    447   grpc_closure destructive_reclaimer_locked;
    448 
    449   /* next bdp ping timer */
    450   bool have_next_bdp_ping_timer;
    451   grpc_timer next_bdp_ping_timer;
    452 
    453   /* keep-alive ping support */
    454   /** Closure to initialize a keepalive ping */
    455   grpc_closure init_keepalive_ping_locked;
    456   /** Closure to run when the keepalive ping is sent */
    457   grpc_closure start_keepalive_ping_locked;
    458   /** Cousure to run when the keepalive ping ack is received */
    459   grpc_closure finish_keepalive_ping_locked;
    460   /** Closrue to run when the keepalive ping timeouts */
    461   grpc_closure keepalive_watchdog_fired_locked;
    462   /** timer to initiate ping events */
    463   grpc_timer keepalive_ping_timer;
    464   /** watchdog to kill the transport when waiting for the keepalive ping */
    465   grpc_timer keepalive_watchdog_timer;
    466   /** time duration in between pings */
    467   grpc_millis keepalive_time;
    468   /** grace period for a ping to complete before watchdog kicks in */
    469   grpc_millis keepalive_timeout;
    470   /** if keepalive pings are allowed when there's no outstanding streams */
    471   bool keepalive_permit_without_calls;
    472   /** keep-alive state machine state */
    473   grpc_chttp2_keepalive_state keepalive_state;
    474 };
    475 
    476 typedef enum {
    477   GRPC_METADATA_NOT_PUBLISHED,
    478   GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
    479   GRPC_METADATA_PUBLISHED_FROM_WIRE,
    480   GPRC_METADATA_PUBLISHED_AT_CLOSE
    481 } grpc_published_metadata_method;
    482 
    483 struct grpc_chttp2_stream {
    484   grpc_chttp2_transport* t;
    485   grpc_stream_refcount* refcount;
    486 
    487   grpc_closure destroy_stream;
    488   grpc_closure* destroy_stream_arg;
    489 
    490   grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
    491   uint8_t included[STREAM_LIST_COUNT];
    492 
    493   /** HTTP2 stream id for this stream, or zero if one has not been assigned */
    494   uint32_t id;
    495 
    496   /** things the upper layers would like to send */
    497   grpc_metadata_batch* send_initial_metadata;
    498   grpc_closure* send_initial_metadata_finished;
    499   grpc_metadata_batch* send_trailing_metadata;
    500   grpc_closure* send_trailing_metadata_finished;
    501 
    502   grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
    503   uint32_t fetched_send_message_length;
    504   grpc_slice fetching_slice;
    505   int64_t next_message_end_offset;
    506   int64_t flow_controlled_bytes_written;
    507   int64_t flow_controlled_bytes_flowed;
    508   grpc_closure complete_fetch_locked;
    509   grpc_closure* fetching_send_message_finished;
    510 
    511   grpc_metadata_batch* recv_initial_metadata;
    512   grpc_closure* recv_initial_metadata_ready;
    513   bool* trailing_metadata_available;
    514   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
    515   grpc_closure* recv_message_ready;
    516   grpc_metadata_batch* recv_trailing_metadata;
    517   grpc_closure* recv_trailing_metadata_finished;
    518 
    519   grpc_transport_stream_stats* collecting_stats;
    520   grpc_transport_stream_stats stats;
    521 
    522   /** Is this stream closed for writing. */
    523   bool write_closed;
    524   /** Is this stream reading half-closed. */
    525   bool read_closed;
    526   /** Are all published incoming byte streams closed. */
    527   bool all_incoming_byte_streams_finished;
    528   /** Has this stream seen an error.
    529       If true, then pending incoming frames can be thrown away. */
    530   bool seen_error;
    531   /** Are we buffering writes on this stream? If yes, we won't become writable
    532       until there's enough queued up in the flow_controlled_buffer */
    533   bool write_buffering;
    534   /** Has trailing metadata been received. */
    535   bool received_trailing_metadata;
    536 
    537   /** the error that resulted in this stream being read-closed */
    538   grpc_error* read_closed_error;
    539   /** the error that resulted in this stream being write-closed */
    540   grpc_error* write_closed_error;
    541 
    542   grpc_published_metadata_method published_metadata[2];
    543   bool final_metadata_requested;
    544 
    545   grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
    546 
    547   grpc_slice_buffer frame_storage; /* protected by t combiner */
    548 
    549   /* Accessed only by transport thread when stream->pending_byte_stream == false
    550    * Accessed only by application thread when stream->pending_byte_stream ==
    551    * true */
    552   grpc_slice_buffer unprocessed_incoming_frames_buffer;
    553   grpc_closure* on_next;    /* protected by t combiner */
    554   bool pending_byte_stream; /* protected by t combiner */
    555   // cached length of buffer to be used by the transport thread in cases where
    556   // stream->pending_byte_stream == true. The value is saved before
    557   // application threads are allowed to modify
    558   // unprocessed_incoming_frames_buffer
    559   size_t unprocessed_incoming_frames_buffer_cached_length;
    560   grpc_closure reset_byte_stream;
    561   grpc_error* byte_stream_error; /* protected by t combiner */
    562   bool received_last_frame;      /* protected by t combiner */
    563 
    564   grpc_millis deadline;
    565 
    566   /** saw some stream level error */
    567   grpc_error* forced_close_error;
    568   /** how many header frames have we received? */
    569   uint8_t header_frames_received;
    570   /** parsing state for data frames */
    571   /* Accessed only by transport thread when stream->pending_byte_stream == false
    572    * Accessed only by application thread when stream->pending_byte_stream ==
    573    * true */
    574   grpc_chttp2_data_parser data_parser;
    575   /** number of bytes received - reset at end of parse thread execution */
    576   int64_t received_bytes;
    577 
    578   bool sent_initial_metadata;
    579   bool sent_trailing_metadata;
    580 
    581   grpc_core::PolymorphicManualConstructor<
    582       grpc_core::chttp2::StreamFlowControlBase,
    583       grpc_core::chttp2::StreamFlowControl,
    584       grpc_core::chttp2::StreamFlowControlDisabled>
    585       flow_control;
    586 
    587   grpc_slice_buffer flow_controlled_buffer;
    588 
    589   grpc_chttp2_write_cb* on_flow_controlled_cbs;
    590   grpc_chttp2_write_cb* on_write_finished_cbs;
    591   grpc_chttp2_write_cb* finish_after_write;
    592   size_t sending_bytes;
    593 
    594   /* Stream compression method to be used. */
    595   grpc_stream_compression_method stream_compression_method;
    596   /* Stream decompression method to be used. */
    597   grpc_stream_compression_method stream_decompression_method;
    598   /** Stream compression decompress context */
    599   grpc_stream_compression_context* stream_decompression_ctx;
    600   /** Stream compression compress context */
    601   grpc_stream_compression_context* stream_compression_ctx;
    602 
    603   /** Buffer storing data that is compressed but not sent */
    604   grpc_slice_buffer compressed_data_buffer;
    605   /** Amount of uncompressed bytes sent out when compressed_data_buffer is
    606    * emptied */
    607   size_t uncompressed_data_size;
    608   /** Temporary buffer storing decompressed data */
    609   grpc_slice_buffer decompressed_data_buffer;
    610   /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
    611    */
    612   bool unprocessed_incoming_frames_decompressed;
    613   /** gRPC header bytes that are already decompressed */
    614   size_t decompressed_header_bytes;
    615 };
    616 
    617 /** Transport writing call flow:
    618     grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
    619     go out on the wire.
    620     If no other write has been started, a task is enqueued onto our workqueue.
    621     When that task executes, it obtains the global lock, and gathers the data
    622     to write.
    623     The global lock is dropped and we do the syscall to write.
    624     After writing, a follow-up check is made to see if another round of writing
    625     should be performed.
    626 
    627     The actual call chain is documented in the implementation of this function.
    628     */
    629 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
    630                                 grpc_chttp2_initiate_write_reason reason);
    631 
    632 typedef struct {
    633   /** are we writing? */
    634   bool writing;
    635   /** if writing: was it a complete flush (false) or a partial flush (true) */
    636   bool partial;
    637   /** did we queue any completions as part of beginning the write */
    638   bool early_results_scheduled;
    639 } grpc_chttp2_begin_write_result;
    640 
    641 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
    642     grpc_chttp2_transport* t);
    643 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error);
    644 
    645 /** Process one slice of incoming data; return 1 if the connection is still
    646     viable after reading, or 0 if the connection should be torn down */
    647 grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t,
    648                                      grpc_slice slice);
    649 
    650 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
    651                                           grpc_chttp2_stream* s);
    652 /** Get a writable stream
    653     returns non-zero if there was a stream available */
    654 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
    655                                           grpc_chttp2_stream** s);
    656 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
    657                                              grpc_chttp2_stream* s);
    658 
    659 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
    660                                          grpc_chttp2_stream* s);
    661 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
    662 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
    663                                          grpc_chttp2_stream** s);
    664 
    665 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
    666                                          grpc_chttp2_stream* s);
    667 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
    668                                          grpc_chttp2_stream** s);
    669 
    670 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
    671                                                   grpc_chttp2_stream* s);
    672 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
    673                                                   grpc_chttp2_stream** s);
    674 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
    675                                                      grpc_chttp2_stream* s);
    676 
    677 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
    678                                                grpc_chttp2_stream* s);
    679 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
    680                                                grpc_chttp2_stream** s);
    681 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
    682                                                   grpc_chttp2_stream* s);
    683 
    684 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
    685                                             grpc_chttp2_stream* s);
    686 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
    687                                             grpc_chttp2_stream** s);
    688 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
    689                                                grpc_chttp2_stream* s);
    690 
    691 /********* Flow Control ***************/
    692 
    693 // Takes in a flow control action and performs all the needed operations.
    694 void grpc_chttp2_act_on_flowctl_action(
    695     const grpc_core::chttp2::FlowControlAction& action,
    696     grpc_chttp2_transport* t, grpc_chttp2_stream* s);
    697 
    698 /********* End of Flow Control ***************/
    699 
    700 grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t,
    701                                                       uint32_t id);
    702 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
    703                                                       uint32_t id);
    704 
    705 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
    706                                      uint32_t goaway_error,
    707                                      grpc_slice goaway_text);
    708 
    709 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
    710 
    711 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
    712                                        grpc_chttp2_stream* s,
    713                                        grpc_closure** pclosure,
    714                                        grpc_error* error, const char* desc);
    715 
    716 #define GRPC_HEADER_SIZE_IN_BYTES 5
    717 #define MAX_SIZE_T (~(size_t)0)
    718 
    719 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
    720 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
    721   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
    722 
    723 // extern grpc_core::TraceFlag grpc_http_trace;
    724 // extern grpc_core::TraceFlag grpc_flowctl_trace;
    725 
    726 #define GRPC_CHTTP2_IF_TRACING(stmt) \
    727   if (!(grpc_http_trace.enabled()))  \
    728     ;                                \
    729   else                               \
    730     stmt
    731 
    732 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
    733                              grpc_chttp2_stream* stream, grpc_error* error);
    734 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
    735                                     grpc_chttp2_stream* s, int close_reads,
    736                                     int close_writes, grpc_error* error);
    737 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
    738 
    739 #ifndef NDEBUG
    740 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
    741   grpc_chttp2_stream_ref(stream, reason)
    742 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
    743   grpc_chttp2_stream_unref(stream, reason)
    744 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
    745 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
    746 #else
    747 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
    748 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
    749   grpc_chttp2_stream_unref(stream)
    750 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
    751 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
    752 #endif
    753 
    754 #ifndef NDEBUG
    755 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
    756   grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
    757 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
    758   grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
    759 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason,
    760                                  const char* file, int line);
    761 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason,
    762                                const char* file, int line);
    763 #else
    764 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
    765 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
    766 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t);
    767 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t);
    768 #endif
    769 
    770 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
    771 
    772 /** Add a new ping strike to ping_recv_state.ping_strikes. If
    773     ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
    774     with error code ENHANCE_YOUR_CALM and additional debug data resembling
    775     "too_many_pings" followed by immediately closing the connection. */
    776 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
    777 
    778 /** add a ref to the stream and add it to the writable list;
    779     ref will be dropped in writing.c */
    780 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
    781                                       grpc_chttp2_stream* s);
    782 
    783 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
    784                                grpc_error* due_to_error);
    785 
    786 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
    787                                                       grpc_chttp2_stream* s);
    788 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
    789                                              grpc_chttp2_stream* s);
    790 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
    791                                                        grpc_chttp2_stream* s);
    792 
    793 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
    794                                      grpc_chttp2_stream* s, grpc_error* error);
    795 
    796 /** Set the default keepalive configurations, must only be called at
    797     initialization */
    798 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
    799                                                bool is_client);
    800 
    801 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
    802