1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/iomgr/call_combiner.h" 22 23 #include <inttypes.h> 24 25 #include <grpc/support/log.h> 26 #include "src/core/lib/debug/stats.h" 27 #include "src/core/lib/profiling/timers.h" 28 29 grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner"); 30 31 static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { 32 if (cancel_state & 1) { 33 return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1)); 34 } 35 return GRPC_ERROR_NONE; 36 } 37 38 static gpr_atm encode_cancel_state_error(grpc_error* error) { 39 return static_cast<gpr_atm>(1) | (gpr_atm)error; 40 } 41 42 void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { 43 gpr_mpscq_init(&call_combiner->queue); 44 } 45 46 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { 47 gpr_mpscq_destroy(&call_combiner->queue); 48 GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); 49 } 50 51 #ifndef NDEBUG 52 #define DEBUG_ARGS , const char *file, int line 53 #define DEBUG_FMT_STR "%s:%d: " 54 #define DEBUG_FMT_ARGS , file, line 55 #else 56 #define DEBUG_ARGS 57 #define DEBUG_FMT_STR 58 #define DEBUG_FMT_ARGS 59 #endif 60 61 void grpc_call_combiner_start(grpc_call_combiner* call_combiner, 62 grpc_closure* closure, 63 grpc_error* error DEBUG_ARGS, 64 const char* reason) { 65 GPR_TIMER_SCOPE("call_combiner_start", 0); 66 if (grpc_call_combiner_trace.enabled()) { 67 gpr_log(GPR_INFO, 68 "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR 69 "%s] error=%s", 70 call_combiner, closure DEBUG_FMT_ARGS, reason, 71 grpc_error_string(error)); 72 } 73 size_t prev_size = static_cast<size_t>( 74 gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1)); 75 if (grpc_call_combiner_trace.enabled()) { 76 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, 77 prev_size + 1); 78 } 79 GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(); 80 if (prev_size == 0) { 81 GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(); 82 83 GPR_TIMER_MARK("call_combiner_initiate", 0); 84 if (grpc_call_combiner_trace.enabled()) { 85 gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY"); 86 } 87 // Queue was empty, so execute this closure immediately. 88 GRPC_CLOSURE_SCHED(closure, error); 89 } else { 90 if (grpc_call_combiner_trace.enabled()) { 91 gpr_log(GPR_INFO, " QUEUING"); 92 } 93 // Queue was not empty, so add closure to queue. 94 closure->error_data.error = error; 95 gpr_mpscq_push(&call_combiner->queue, 96 reinterpret_cast<gpr_mpscq_node*>(closure)); 97 } 98 } 99 100 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, 101 const char* reason) { 102 GPR_TIMER_SCOPE("call_combiner_stop", 0); 103 if (grpc_call_combiner_trace.enabled()) { 104 gpr_log(GPR_INFO, 105 "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", 106 call_combiner DEBUG_FMT_ARGS, reason); 107 } 108 size_t prev_size = static_cast<size_t>( 109 gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1)); 110 if (grpc_call_combiner_trace.enabled()) { 111 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, 112 prev_size - 1); 113 } 114 GPR_ASSERT(prev_size >= 1); 115 if (prev_size > 1) { 116 while (true) { 117 if (grpc_call_combiner_trace.enabled()) { 118 gpr_log(GPR_INFO, " checking queue"); 119 } 120 bool empty; 121 grpc_closure* closure = reinterpret_cast<grpc_closure*>( 122 gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty)); 123 if (closure == nullptr) { 124 // This can happen either due to a race condition within the mpscq 125 // code or because of a race with grpc_call_combiner_start(). 126 if (grpc_call_combiner_trace.enabled()) { 127 gpr_log(GPR_INFO, " queue returned no result; checking again"); 128 } 129 continue; 130 } 131 if (grpc_call_combiner_trace.enabled()) { 132 gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s", 133 closure, grpc_error_string(closure->error_data.error)); 134 } 135 GRPC_CLOSURE_SCHED(closure, closure->error_data.error); 136 break; 137 } 138 } else if (grpc_call_combiner_trace.enabled()) { 139 gpr_log(GPR_INFO, " queue empty"); 140 } 141 } 142 143 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, 144 grpc_closure* closure) { 145 GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(); 146 while (true) { 147 // Decode original state. 148 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); 149 grpc_error* original_error = decode_cancel_state_error(original_state); 150 // If error is set, invoke the cancellation closure immediately. 151 // Otherwise, store the new closure. 152 if (original_error != GRPC_ERROR_NONE) { 153 if (grpc_call_combiner_trace.enabled()) { 154 gpr_log(GPR_INFO, 155 "call_combiner=%p: scheduling notify_on_cancel callback=%p " 156 "for pre-existing cancellation", 157 call_combiner, closure); 158 } 159 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error)); 160 break; 161 } else { 162 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, 163 (gpr_atm)closure)) { 164 if (grpc_call_combiner_trace.enabled()) { 165 gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p", 166 call_combiner, closure); 167 } 168 // If we replaced an earlier closure, invoke the original 169 // closure with GRPC_ERROR_NONE. This allows callers to clean 170 // up any resources they may be holding for the callback. 171 if (original_state != 0) { 172 closure = (grpc_closure*)original_state; 173 if (grpc_call_combiner_trace.enabled()) { 174 gpr_log(GPR_INFO, 175 "call_combiner=%p: scheduling old cancel callback=%p", 176 call_combiner, closure); 177 } 178 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); 179 } 180 break; 181 } 182 } 183 // cas failed, try again. 184 } 185 } 186 187 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, 188 grpc_error* error) { 189 GRPC_STATS_INC_CALL_COMBINER_CANCELLED(); 190 while (true) { 191 gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); 192 grpc_error* original_error = decode_cancel_state_error(original_state); 193 if (original_error != GRPC_ERROR_NONE) { 194 GRPC_ERROR_UNREF(error); 195 break; 196 } 197 if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, 198 encode_cancel_state_error(error))) { 199 if (original_state != 0) { 200 grpc_closure* notify_on_cancel = (grpc_closure*)original_state; 201 if (grpc_call_combiner_trace.enabled()) { 202 gpr_log(GPR_INFO, 203 "call_combiner=%p: scheduling notify_on_cancel callback=%p", 204 call_combiner, notify_on_cancel); 205 } 206 GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error)); 207 } 208 break; 209 } 210 // cas failed, try again. 211 } 212 } 213