Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/transport/transport.h"
     22 
     23 #include <string.h>
     24 
     25 #include <grpc/support/alloc.h>
     26 #include <grpc/support/atm.h>
     27 #include <grpc/support/log.h>
     28 #include <grpc/support/sync.h>
     29 
     30 #include "src/core/lib/gpr/string.h"
     31 #include "src/core/lib/iomgr/executor.h"
     32 #include "src/core/lib/slice/slice_internal.h"
     33 #include "src/core/lib/slice/slice_string_helpers.h"
     34 #include "src/core/lib/transport/transport_impl.h"
     35 
     36 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false,
     37                                                          "stream_refcount");
     38 
     39 #ifndef NDEBUG
     40 void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason) {
     41   if (grpc_trace_stream_refcount.enabled()) {
     42     gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
     43     gpr_log(GPR_DEBUG, "%s %p:%p   REF %" PRIdPTR "->%" PRIdPTR " %s",
     44             refcount->object_type, refcount, refcount->destroy.cb_arg, val,
     45             val + 1, reason);
     46   }
     47 #else
     48 void grpc_stream_ref(grpc_stream_refcount* refcount) {
     49 #endif
     50   gpr_ref_non_zero(&refcount->refs);
     51 }
     52 
     53 #ifndef NDEBUG
     54 void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) {
     55   if (grpc_trace_stream_refcount.enabled()) {
     56     gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
     57     gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
     58             refcount->object_type, refcount, refcount->destroy.cb_arg, val,
     59             val - 1, reason);
     60   }
     61 #else
     62 void grpc_stream_unref(grpc_stream_refcount* refcount) {
     63 #endif
     64   if (gpr_unref(&refcount->refs)) {
     65     if (grpc_core::ExecCtx::Get()->flags() &
     66         GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
     67       /* Ick.
     68          The thread we're running on MAY be owned (indirectly) by a call-stack.
     69          If that's the case, destroying the call-stack MAY try to destroy the
     70          thread, which is a tangled mess that we just don't want to ever have to
     71          cope with.
     72          Throw this over to the executor (on a core-owned thread) and process it
     73          there. */
     74       refcount->destroy.scheduler =
     75           grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
     76     }
     77     GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
     78   }
     79 }
     80 
     81 #define STREAM_REF_FROM_SLICE_REF(p)       \
     82   ((grpc_stream_refcount*)(((uint8_t*)p) - \
     83                            offsetof(grpc_stream_refcount, slice_refcount)))
     84 
     85 static void slice_stream_ref(void* p) {
     86 #ifndef NDEBUG
     87   grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
     88 #else
     89   grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
     90 #endif
     91 }
     92 
     93 static void slice_stream_unref(void* p) {
     94 #ifndef NDEBUG
     95   grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice");
     96 #else
     97   grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p));
     98 #endif
     99 }
    100 
    101 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
    102                                                void* buffer, size_t length) {
    103   slice_stream_ref(&refcount->slice_refcount);
    104   grpc_slice res;
    105   res.refcount = &refcount->slice_refcount;
    106   res.data.refcounted.bytes = static_cast<uint8_t*>(buffer);
    107   res.data.refcounted.length = length;
    108   return res;
    109 }
    110 
    111 static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
    112     slice_stream_ref,            /* ref */
    113     slice_stream_unref,          /* unref */
    114     grpc_slice_default_eq_impl,  /* eq */
    115     grpc_slice_default_hash_impl /* hash */
    116 };
    117 
    118 #ifndef NDEBUG
    119 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
    120                           grpc_iomgr_cb_func cb, void* cb_arg,
    121                           const char* object_type) {
    122   refcount->object_type = object_type;
    123 #else
    124 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
    125                           grpc_iomgr_cb_func cb, void* cb_arg) {
    126 #endif
    127   gpr_ref_init(&refcount->refs, initial_refs);
    128   GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
    129   refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
    130   refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
    131 }
    132 
    133 static void move64(uint64_t* from, uint64_t* to) {
    134   *to += *from;
    135   *from = 0;
    136 }
    137 
    138 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
    139                                        grpc_transport_one_way_stats* to) {
    140   move64(&from->framing_bytes, &to->framing_bytes);
    141   move64(&from->data_bytes, &to->data_bytes);
    142   move64(&from->header_bytes, &to->header_bytes);
    143 }
    144 
    145 void grpc_transport_move_stats(grpc_transport_stream_stats* from,
    146                                grpc_transport_stream_stats* to) {
    147   grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
    148   grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
    149 }
    150 
    151 size_t grpc_transport_stream_size(grpc_transport* transport) {
    152   return transport->vtable->sizeof_stream;
    153 }
    154 
    155 void grpc_transport_destroy(grpc_transport* transport) {
    156   transport->vtable->destroy(transport);
    157 }
    158 
    159 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
    160                                grpc_stream_refcount* refcount,
    161                                const void* server_data, gpr_arena* arena) {
    162   return transport->vtable->init_stream(transport, stream, refcount,
    163                                         server_data, arena);
    164 }
    165 
    166 void grpc_transport_perform_stream_op(grpc_transport* transport,
    167                                       grpc_stream* stream,
    168                                       grpc_transport_stream_op_batch* op) {
    169   transport->vtable->perform_stream_op(transport, stream, op);
    170 }
    171 
    172 void grpc_transport_perform_op(grpc_transport* transport,
    173                                grpc_transport_op* op) {
    174   transport->vtable->perform_op(transport, op);
    175 }
    176 
    177 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
    178                              grpc_polling_entity* pollent) {
    179   grpc_pollset* pollset;
    180   grpc_pollset_set* pollset_set;
    181   if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
    182     transport->vtable->set_pollset(transport, stream, pollset);
    183   } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
    184              nullptr) {
    185     transport->vtable->set_pollset_set(transport, stream, pollset_set);
    186   } else {
    187     // No-op for empty pollset. Empty pollset is possible when using
    188     // non-fd-based event engines such as CFStream.
    189   }
    190 }
    191 
    192 void grpc_transport_destroy_stream(grpc_transport* transport,
    193                                    grpc_stream* stream,
    194                                    grpc_closure* then_schedule_closure) {
    195   transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
    196 }
    197 
    198 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
    199   return transport->vtable->get_endpoint(transport);
    200 }
    201 
    202 // This comment should be sung to the tune of
    203 // "Supercalifragilisticexpialidocious":
    204 //
    205 // grpc_transport_stream_op_batch_finish_with_failure
    206 // is a function that must always unref cancel_error
    207 // though it lives in lib, it handles transport stream ops sure
    208 // it's grpc_transport_stream_op_batch_finish_with_failure
    209 void grpc_transport_stream_op_batch_finish_with_failure(
    210     grpc_transport_stream_op_batch* batch, grpc_error* error,
    211     grpc_call_combiner* call_combiner) {
    212   if (batch->send_message) {
    213     batch->payload->send_message.send_message.reset();
    214   }
    215   if (batch->cancel_stream) {
    216     GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
    217   }
    218   // Construct a list of closures to execute.
    219   grpc_core::CallCombinerClosureList closures;
    220   if (batch->recv_initial_metadata) {
    221     closures.Add(
    222         batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
    223         GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
    224   }
    225   if (batch->recv_message) {
    226     closures.Add(batch->payload->recv_message.recv_message_ready,
    227                  GRPC_ERROR_REF(error), "failing recv_message_ready");
    228   }
    229   if (batch->recv_trailing_metadata) {
    230     closures.Add(
    231         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
    232         GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
    233   }
    234   if (batch->on_complete != nullptr) {
    235     closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
    236                  "failing on_complete");
    237   }
    238   // Execute closures.
    239   closures.RunClosures(call_combiner);
    240   GRPC_ERROR_UNREF(error);
    241 }
    242 
    243 typedef struct {
    244   grpc_closure outer_on_complete;
    245   grpc_closure* inner_on_complete;
    246   grpc_transport_op op;
    247 } made_transport_op;
    248 
    249 static void destroy_made_transport_op(void* arg, grpc_error* error) {
    250   made_transport_op* op = static_cast<made_transport_op*>(arg);
    251   GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error));
    252   gpr_free(op);
    253 }
    254 
    255 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) {
    256   made_transport_op* op =
    257       static_cast<made_transport_op*>(gpr_malloc(sizeof(*op)));
    258   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
    259                     grpc_schedule_on_exec_ctx);
    260   op->inner_on_complete = on_complete;
    261   memset(&op->op, 0, sizeof(op->op));
    262   op->op.on_consumed = &op->outer_on_complete;
    263   return &op->op;
    264 }
    265 
    266 typedef struct {
    267   grpc_closure outer_on_complete;
    268   grpc_closure* inner_on_complete;
    269   grpc_transport_stream_op_batch op;
    270   grpc_transport_stream_op_batch_payload payload;
    271 } made_transport_stream_op;
    272 
    273 static void destroy_made_transport_stream_op(void* arg, grpc_error* error) {
    274   made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
    275   grpc_closure* c = op->inner_on_complete;
    276   gpr_free(op);
    277   GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error));
    278 }
    279 
    280 grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
    281     grpc_closure* on_complete) {
    282   made_transport_stream_op* op =
    283       static_cast<made_transport_stream_op*>(gpr_zalloc(sizeof(*op)));
    284   op->op.payload = &op->payload;
    285   GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
    286                     op, grpc_schedule_on_exec_ctx);
    287   op->inner_on_complete = on_complete;
    288   op->op.on_complete = &op->outer_on_complete;
    289   return &op->op;
    290 }
    291