1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/transport/transport.h" 22 23 #include <string.h> 24 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/atm.h> 27 #include <grpc/support/log.h> 28 #include <grpc/support/sync.h> 29 30 #include "src/core/lib/gpr/string.h" 31 #include "src/core/lib/iomgr/executor.h" 32 #include "src/core/lib/slice/slice_internal.h" 33 #include "src/core/lib/slice/slice_string_helpers.h" 34 #include "src/core/lib/transport/transport_impl.h" 35 36 grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount(false, 37 "stream_refcount"); 38 39 #ifndef NDEBUG 40 void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason) { 41 if (grpc_trace_stream_refcount.enabled()) { 42 gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); 43 gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s", 44 refcount->object_type, refcount, refcount->destroy.cb_arg, val, 45 val + 1, reason); 46 } 47 #else 48 void grpc_stream_ref(grpc_stream_refcount* refcount) { 49 #endif 50 gpr_ref_non_zero(&refcount->refs); 51 } 52 53 #ifndef NDEBUG 54 void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) { 55 if (grpc_trace_stream_refcount.enabled()) { 56 gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); 57 gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s", 58 refcount->object_type, refcount, refcount->destroy.cb_arg, val, 59 val - 1, reason); 60 } 61 #else 62 void grpc_stream_unref(grpc_stream_refcount* refcount) { 63 #endif 64 if (gpr_unref(&refcount->refs)) { 65 if (grpc_core::ExecCtx::Get()->flags() & 66 GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { 67 /* Ick. 68 The thread we're running on MAY be owned (indirectly) by a call-stack. 69 If that's the case, destroying the call-stack MAY try to destroy the 70 thread, which is a tangled mess that we just don't want to ever have to 71 cope with. 72 Throw this over to the executor (on a core-owned thread) and process it 73 there. */ 74 refcount->destroy.scheduler = 75 grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); 76 } 77 GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); 78 } 79 } 80 81 #define STREAM_REF_FROM_SLICE_REF(p) \ 82 ((grpc_stream_refcount*)(((uint8_t*)p) - \ 83 offsetof(grpc_stream_refcount, slice_refcount))) 84 85 static void slice_stream_ref(void* p) { 86 #ifndef NDEBUG 87 grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice"); 88 #else 89 grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p)); 90 #endif 91 } 92 93 static void slice_stream_unref(void* p) { 94 #ifndef NDEBUG 95 grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice"); 96 #else 97 grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p)); 98 #endif 99 } 100 101 grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount, 102 void* buffer, size_t length) { 103 slice_stream_ref(&refcount->slice_refcount); 104 grpc_slice res; 105 res.refcount = &refcount->slice_refcount; 106 res.data.refcounted.bytes = static_cast<uint8_t*>(buffer); 107 res.data.refcounted.length = length; 108 return res; 109 } 110 111 static const grpc_slice_refcount_vtable stream_ref_slice_vtable = { 112 slice_stream_ref, /* ref */ 113 slice_stream_unref, /* unref */ 114 grpc_slice_default_eq_impl, /* eq */ 115 grpc_slice_default_hash_impl /* hash */ 116 }; 117 118 #ifndef NDEBUG 119 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, 120 grpc_iomgr_cb_func cb, void* cb_arg, 121 const char* object_type) { 122 refcount->object_type = object_type; 123 #else 124 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs, 125 grpc_iomgr_cb_func cb, void* cb_arg) { 126 #endif 127 gpr_ref_init(&refcount->refs, initial_refs); 128 GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx); 129 refcount->slice_refcount.vtable = &stream_ref_slice_vtable; 130 refcount->slice_refcount.sub_refcount = &refcount->slice_refcount; 131 } 132 133 static void move64(uint64_t* from, uint64_t* to) { 134 *to += *from; 135 *from = 0; 136 } 137 138 void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from, 139 grpc_transport_one_way_stats* to) { 140 move64(&from->framing_bytes, &to->framing_bytes); 141 move64(&from->data_bytes, &to->data_bytes); 142 move64(&from->header_bytes, &to->header_bytes); 143 } 144 145 void grpc_transport_move_stats(grpc_transport_stream_stats* from, 146 grpc_transport_stream_stats* to) { 147 grpc_transport_move_one_way_stats(&from->incoming, &to->incoming); 148 grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing); 149 } 150 151 size_t grpc_transport_stream_size(grpc_transport* transport) { 152 return transport->vtable->sizeof_stream; 153 } 154 155 void grpc_transport_destroy(grpc_transport* transport) { 156 transport->vtable->destroy(transport); 157 } 158 159 int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream, 160 grpc_stream_refcount* refcount, 161 const void* server_data, gpr_arena* arena) { 162 return transport->vtable->init_stream(transport, stream, refcount, 163 server_data, arena); 164 } 165 166 void grpc_transport_perform_stream_op(grpc_transport* transport, 167 grpc_stream* stream, 168 grpc_transport_stream_op_batch* op) { 169 transport->vtable->perform_stream_op(transport, stream, op); 170 } 171 172 void grpc_transport_perform_op(grpc_transport* transport, 173 grpc_transport_op* op) { 174 transport->vtable->perform_op(transport, op); 175 } 176 177 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, 178 grpc_polling_entity* pollent) { 179 grpc_pollset* pollset; 180 grpc_pollset_set* pollset_set; 181 if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) { 182 transport->vtable->set_pollset(transport, stream, pollset); 183 } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) != 184 nullptr) { 185 transport->vtable->set_pollset_set(transport, stream, pollset_set); 186 } else { 187 // No-op for empty pollset. Empty pollset is possible when using 188 // non-fd-based event engines such as CFStream. 189 } 190 } 191 192 void grpc_transport_destroy_stream(grpc_transport* transport, 193 grpc_stream* stream, 194 grpc_closure* then_schedule_closure) { 195 transport->vtable->destroy_stream(transport, stream, then_schedule_closure); 196 } 197 198 grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) { 199 return transport->vtable->get_endpoint(transport); 200 } 201 202 // This comment should be sung to the tune of 203 // "Supercalifragilisticexpialidocious": 204 // 205 // grpc_transport_stream_op_batch_finish_with_failure 206 // is a function that must always unref cancel_error 207 // though it lives in lib, it handles transport stream ops sure 208 // it's grpc_transport_stream_op_batch_finish_with_failure 209 void grpc_transport_stream_op_batch_finish_with_failure( 210 grpc_transport_stream_op_batch* batch, grpc_error* error, 211 grpc_call_combiner* call_combiner) { 212 if (batch->send_message) { 213 batch->payload->send_message.send_message.reset(); 214 } 215 if (batch->cancel_stream) { 216 GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); 217 } 218 // Construct a list of closures to execute. 219 grpc_core::CallCombinerClosureList closures; 220 if (batch->recv_initial_metadata) { 221 closures.Add( 222 batch->payload->recv_initial_metadata.recv_initial_metadata_ready, 223 GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); 224 } 225 if (batch->recv_message) { 226 closures.Add(batch->payload->recv_message.recv_message_ready, 227 GRPC_ERROR_REF(error), "failing recv_message_ready"); 228 } 229 if (batch->recv_trailing_metadata) { 230 closures.Add( 231 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready, 232 GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready"); 233 } 234 if (batch->on_complete != nullptr) { 235 closures.Add(batch->on_complete, GRPC_ERROR_REF(error), 236 "failing on_complete"); 237 } 238 // Execute closures. 239 closures.RunClosures(call_combiner); 240 GRPC_ERROR_UNREF(error); 241 } 242 243 typedef struct { 244 grpc_closure outer_on_complete; 245 grpc_closure* inner_on_complete; 246 grpc_transport_op op; 247 } made_transport_op; 248 249 static void destroy_made_transport_op(void* arg, grpc_error* error) { 250 made_transport_op* op = static_cast<made_transport_op*>(arg); 251 GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error)); 252 gpr_free(op); 253 } 254 255 grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete) { 256 made_transport_op* op = 257 static_cast<made_transport_op*>(gpr_malloc(sizeof(*op))); 258 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op, 259 grpc_schedule_on_exec_ctx); 260 op->inner_on_complete = on_complete; 261 memset(&op->op, 0, sizeof(op->op)); 262 op->op.on_consumed = &op->outer_on_complete; 263 return &op->op; 264 } 265 266 typedef struct { 267 grpc_closure outer_on_complete; 268 grpc_closure* inner_on_complete; 269 grpc_transport_stream_op_batch op; 270 grpc_transport_stream_op_batch_payload payload; 271 } made_transport_stream_op; 272 273 static void destroy_made_transport_stream_op(void* arg, grpc_error* error) { 274 made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg); 275 grpc_closure* c = op->inner_on_complete; 276 gpr_free(op); 277 GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error)); 278 } 279 280 grpc_transport_stream_op_batch* grpc_make_transport_stream_op( 281 grpc_closure* on_complete) { 282 made_transport_stream_op* op = 283 static_cast<made_transport_stream_op*>(gpr_zalloc(sizeof(*op))); 284 op->op.payload = &op->payload; 285 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op, 286 op, grpc_schedule_on_exec_ctx); 287 op->inner_on_complete = on_complete; 288 op->op.on_complete = &op->outer_on_complete; 289 return &op->op; 290 } 291