Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
     20 #define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
     21 
     22 #include <grpc/support/port_platform.h>
     23 
     24 #include <stddef.h>
     25 
     26 #include "src/core/lib/channel/context.h"
     27 #include "src/core/lib/gpr/arena.h"
     28 #include "src/core/lib/iomgr/call_combiner.h"
     29 #include "src/core/lib/iomgr/endpoint.h"
     30 #include "src/core/lib/iomgr/polling_entity.h"
     31 #include "src/core/lib/iomgr/pollset.h"
     32 #include "src/core/lib/iomgr/pollset_set.h"
     33 #include "src/core/lib/transport/byte_stream.h"
     34 #include "src/core/lib/transport/metadata_batch.h"
     35 
     36 /* Minimum and maximum protocol accepted versions. */
     37 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
     38 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
     39 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
     40 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
     41 
     42 /* forward declarations */
     43 
     44 typedef struct grpc_transport grpc_transport;
     45 
     46 /* grpc_stream doesn't actually exist. It's used as a typesafe
     47    opaque pointer for whatever data the transport wants to track
     48    for a stream. */
     49 typedef struct grpc_stream grpc_stream;
     50 
     51 extern grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount;
     52 
     53 typedef struct grpc_stream_refcount {
     54   gpr_refcount refs;
     55   grpc_closure destroy;
     56 #ifndef NDEBUG
     57   const char* object_type;
     58 #endif
     59   grpc_slice_refcount slice_refcount;
     60 } grpc_stream_refcount;
     61 
     62 #ifndef NDEBUG
     63 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
     64                           grpc_iomgr_cb_func cb, void* cb_arg,
     65                           const char* object_type);
     66 void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason);
     67 void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason);
     68 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
     69   grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
     70 #else
     71 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
     72                           grpc_iomgr_cb_func cb, void* cb_arg);
     73 void grpc_stream_ref(grpc_stream_refcount* refcount);
     74 void grpc_stream_unref(grpc_stream_refcount* refcount);
     75 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
     76   grpc_stream_ref_init(rc, ir, cb, cb_arg)
     77 #endif
     78 
     79 /* Wrap a buffer that is owned by some stream object into a slice that shares
     80    the same refcount */
     81 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
     82                                                void* buffer, size_t length);
     83 
     84 typedef struct {
     85   uint64_t framing_bytes;
     86   uint64_t data_bytes;
     87   uint64_t header_bytes;
     88 } grpc_transport_one_way_stats;
     89 
     90 typedef struct grpc_transport_stream_stats {
     91   grpc_transport_one_way_stats incoming;
     92   grpc_transport_one_way_stats outgoing;
     93 } grpc_transport_stream_stats;
     94 
     95 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
     96                                        grpc_transport_one_way_stats* to);
     97 
     98 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
     99                                grpc_transport_stream_stats* to);
    100 
    101 // This struct (which is present in both grpc_transport_stream_op_batch
    102 // and grpc_transport_op_batch) is a convenience to allow filters or
    103 // transports to schedule a closure related to a particular batch without
    104 // having to allocate memory.  The general pattern is to initialize the
    105 // closure with the callback arg set to the batch and extra_arg set to
    106 // whatever state is associated with the handler (e.g., the call element
    107 // or the transport stream object).
    108 //
    109 // Note that this can only be used by the current handler of a given
    110 // batch on the way down the stack (i.e., whichever filter or transport is
    111 // currently handling the batch).  Once a filter or transport passes control
    112 // of the batch to the next handler, it cannot depend on the contents of
    113 // this struct anymore, because the next handler may reuse it.
    114 typedef struct {
    115   void* extra_arg;
    116   grpc_closure closure;
    117 } grpc_handler_private_op_data;
    118 
    119 typedef struct grpc_transport_stream_op_batch_payload
    120     grpc_transport_stream_op_batch_payload;
    121 
    122 /* Transport stream op: a set of operations to perform on a transport
    123    against a single stream */
    124 typedef struct grpc_transport_stream_op_batch {
    125   /** Should be scheduled when all of the non-recv operations in the batch
    126       are complete.
    127 
    128       The recv ops (recv_initial_metadata, recv_message, and
    129       recv_trailing_metadata) each have their own callbacks.  If a batch
    130       contains both recv ops and non-recv ops, on_complete should be
    131       scheduled as soon as the non-recv ops are complete, regardless of
    132       whether or not the recv ops are complete.  If a batch contains
    133       only recv ops, on_complete can be null. */
    134   grpc_closure* on_complete;
    135 
    136   /** Values for the stream op (fields set are determined by flags above) */
    137   grpc_transport_stream_op_batch_payload* payload;
    138 
    139   /** Send initial metadata to the peer, from the provided metadata batch. */
    140   bool send_initial_metadata : 1;
    141 
    142   /** Send trailing metadata to the peer, from the provided metadata batch. */
    143   bool send_trailing_metadata : 1;
    144 
    145   /** Send message data to the peer, from the provided byte stream. */
    146   bool send_message : 1;
    147 
    148   /** Receive initial metadata from the stream, into provided metadata batch. */
    149   bool recv_initial_metadata : 1;
    150 
    151   /** Receive message data from the stream, into provided byte stream. */
    152   bool recv_message : 1;
    153 
    154   /** Receive trailing metadata from the stream, into provided metadata batch.
    155    */
    156   bool recv_trailing_metadata : 1;
    157 
    158   /** Cancel this stream with the provided error */
    159   bool cancel_stream : 1;
    160 
    161   /***************************************************************************
    162    * remaining fields are initialized and used at the discretion of the
    163    * current handler of the op */
    164 
    165   grpc_handler_private_op_data handler_private;
    166 } grpc_transport_stream_op_batch;
    167 
    168 struct grpc_transport_stream_op_batch_payload {
    169   struct {
    170     grpc_metadata_batch* send_initial_metadata;
    171     /** Iff send_initial_metadata != NULL, flags associated with
    172         send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
    173     uint32_t send_initial_metadata_flags;
    174     // If non-NULL, will be set by the transport to the peer string (a char*).
    175     // The transport retains ownership of the string.
    176     // Note: This pointer may be used by the transport after the
    177     // send_initial_metadata op is completed.  It must remain valid
    178     // until the call is destroyed.
    179     gpr_atm* peer_string;
    180   } send_initial_metadata;
    181 
    182   struct {
    183     grpc_metadata_batch* send_trailing_metadata;
    184   } send_trailing_metadata;
    185 
    186   struct {
    187     // The transport (or a filter that decides to return a failure before
    188     // the op gets down to the transport) takes ownership.
    189     // The batch's on_complete will not be called until after the byte
    190     // stream is orphaned.
    191     grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
    192   } send_message;
    193 
    194   struct {
    195     grpc_metadata_batch* recv_initial_metadata;
    196     // Flags are used only on the server side.  If non-null, will be set to
    197     // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
    198     // indicate if the call is idempotent).
    199     uint32_t* recv_flags;
    200     /** Should be enqueued when initial metadata is ready to be processed. */
    201     grpc_closure* recv_initial_metadata_ready;
    202     // If not NULL, will be set to true if trailing metadata is
    203     // immediately available.  This may be a signal that we received a
    204     // Trailers-Only response.
    205     bool* trailing_metadata_available;
    206     // If non-NULL, will be set by the transport to the peer string (a char*).
    207     // The transport retains ownership of the string.
    208     // Note: This pointer may be used by the transport after the
    209     // recv_initial_metadata op is completed.  It must remain valid
    210     // until the call is destroyed.
    211     gpr_atm* peer_string;
    212   } recv_initial_metadata;
    213 
    214   struct {
    215     // Will be set by the transport to point to the byte stream
    216     // containing a received message.
    217     // Will be NULL if trailing metadata is received instead of a message.
    218     grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
    219     /** Should be enqueued when one message is ready to be processed. */
    220     grpc_closure* recv_message_ready;
    221   } recv_message;
    222 
    223   struct {
    224     grpc_metadata_batch* recv_trailing_metadata;
    225     grpc_transport_stream_stats* collect_stats;
    226     /** Should be enqueued when initial metadata is ready to be processed. */
    227     grpc_closure* recv_trailing_metadata_ready;
    228   } recv_trailing_metadata;
    229 
    230   /** Forcefully close this stream.
    231       The HTTP2 semantics should be:
    232       - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
    233         trailing metadata has not been sent, send trailing metadata with status
    234         and message from cancel_error (use grpc_error_get_status) followed by
    235         a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close
    236       - at all other times: use grpc_error_get_status to get a status code, and
    237         convert to a HTTP2 error code using
    238         grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
    239         error. */
    240   struct {
    241     // Error contract: the transport that gets this op must cause cancel_error
    242     //                 to be unref'ed after processing it
    243     grpc_error* cancel_error;
    244   } cancel_stream;
    245 
    246   /* Indexes correspond to grpc_context_index enum values */
    247   grpc_call_context_element* context;
    248 };
    249 
    250 /** Transport op: a set of operations to perform on a transport as a whole */
    251 typedef struct grpc_transport_op {
    252   /** Called when processing of this op is done. */
    253   grpc_closure* on_consumed;
    254   /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
    255   grpc_closure* on_connectivity_state_change;
    256   grpc_connectivity_state* connectivity_state;
    257   /** should the transport be disconnected
    258    * Error contract: the transport that gets this op must cause
    259    *                 disconnect_with_error to be unref'ed after processing it */
    260   grpc_error* disconnect_with_error;
    261   /** what should the goaway contain?
    262    * Error contract: the transport that gets this op must cause
    263    *                 goaway_error to be unref'ed after processing it */
    264   grpc_error* goaway_error;
    265   /** set the callback for accepting new streams;
    266       this is a permanent callback, unlike the other one-shot closures.
    267       If true, the callback is set to set_accept_stream_fn, with its
    268       user_data argument set to set_accept_stream_user_data */
    269   bool set_accept_stream;
    270   void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
    271                                const void* server_data);
    272   void* set_accept_stream_user_data;
    273   /** add this transport to a pollset */
    274   grpc_pollset* bind_pollset;
    275   /** add this transport to a pollset_set */
    276   grpc_pollset_set* bind_pollset_set;
    277   /** send a ping, if either on_initiate or on_ack is not NULL */
    278   struct {
    279     /** Ping may be delayed by the transport, on_initiate callback will be
    280         called when the ping is actually being sent. */
    281     grpc_closure* on_initiate;
    282     /** Called when the ping ack is received */
    283     grpc_closure* on_ack;
    284   } send_ping;
    285   // If true, will reset the channel's connection backoff.
    286   bool reset_connect_backoff;
    287 
    288   /***************************************************************************
    289    * remaining fields are initialized and used at the discretion of the
    290    * transport implementation */
    291 
    292   grpc_handler_private_op_data handler_private;
    293 } grpc_transport_op;
    294 
    295 /* Returns the amount of memory required to store a grpc_stream for this
    296    transport */
    297 size_t grpc_transport_stream_size(grpc_transport* transport);
    298 
    299 /* Initialize transport data for a stream.
    300 
    301    Returns 0 on success, any other (transport-defined) value for failure.
    302    May assume that stream contains all-zeros.
    303 
    304    Arguments:
    305      transport   - the transport on which to create this stream
    306      stream      - a pointer to uninitialized memory to initialize
    307      server_data - either NULL for a client initiated stream, or a pointer
    308                    supplied from the accept_stream callback function */
    309 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
    310                                grpc_stream_refcount* refcount,
    311                                const void* server_data, gpr_arena* arena);
    312 
    313 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
    314                              grpc_polling_entity* pollent);
    315 
    316 /* Destroy transport data for a stream.
    317 
    318    Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
    319    received by the up-layer. Must not be called in the same call stack as
    320    recv_frame.
    321 
    322    Arguments:
    323      transport - the transport on which to create this stream
    324      stream    - the grpc_stream to destroy (memory is still owned by the
    325                  caller, but any child memory must be cleaned up) */
    326 void grpc_transport_destroy_stream(grpc_transport* transport,
    327                                    grpc_stream* stream,
    328                                    grpc_closure* then_schedule_closure);
    329 
    330 void grpc_transport_stream_op_batch_finish_with_failure(
    331     grpc_transport_stream_op_batch* op, grpc_error* error,
    332     grpc_call_combiner* call_combiner);
    333 
    334 char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op);
    335 char* grpc_transport_op_string(grpc_transport_op* op);
    336 
    337 /* Send a batch of operations on a transport
    338 
    339    Takes ownership of any objects contained in ops.
    340 
    341    Arguments:
    342      transport - the transport on which to initiate the stream
    343      stream    - the stream on which to send the operations. This must be
    344                  non-NULL and previously initialized by the same transport.
    345      op        - a grpc_transport_stream_op_batch specifying the op to perform
    346    */
    347 void grpc_transport_perform_stream_op(grpc_transport* transport,
    348                                       grpc_stream* stream,
    349                                       grpc_transport_stream_op_batch* op);
    350 
    351 void grpc_transport_perform_op(grpc_transport* transport,
    352                                grpc_transport_op* op);
    353 
    354 /* Send a ping on a transport
    355 
    356    Calls cb with user data when a response is received. */
    357 void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);
    358 
    359 /* Advise peer of pending connection termination. */
    360 void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
    361                            grpc_slice debug_data);
    362 
    363 /* Destroy the transport */
    364 void grpc_transport_destroy(grpc_transport* transport);
    365 
    366 /* Get the endpoint used by \a transport */
    367 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport);
    368 
    369 /* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
    370    \a on_consumed and then delete the returned transport op */
    371 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed);
    372 /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed
    373    closure
    374    to \a on_consumed and then delete the returned transport op */
    375 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
    376     grpc_closure* on_consumed);
    377 
    378 #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */
    379