1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/transport/connectivity_state.h" 22 23 #include <string.h> 24 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/log.h> 27 #include <grpc/support/string_util.h> 28 29 grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state"); 30 31 const char* grpc_connectivity_state_name(grpc_connectivity_state state) { 32 switch (state) { 33 case GRPC_CHANNEL_IDLE: 34 return "IDLE"; 35 case GRPC_CHANNEL_CONNECTING: 36 return "CONNECTING"; 37 case GRPC_CHANNEL_READY: 38 return "READY"; 39 case GRPC_CHANNEL_TRANSIENT_FAILURE: 40 return "TRANSIENT_FAILURE"; 41 case GRPC_CHANNEL_SHUTDOWN: 42 return "SHUTDOWN"; 43 } 44 GPR_UNREACHABLE_CODE(return "UNKNOWN"); 45 } 46 47 void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker, 48 grpc_connectivity_state init_state, 49 const char* name) { 50 gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state); 51 tracker->current_error = GRPC_ERROR_NONE; 52 tracker->watchers = nullptr; 53 tracker->name = gpr_strdup(name); 54 } 55 56 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) { 57 grpc_error* error; 58 grpc_connectivity_state_watcher* w; 59 while ((w = tracker->watchers)) { 60 tracker->watchers = w->next; 61 62 if (GRPC_CHANNEL_SHUTDOWN != *w->current) { 63 *w->current = GRPC_CHANNEL_SHUTDOWN; 64 error = GRPC_ERROR_NONE; 65 } else { 66 error = 67 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner"); 68 } 69 GRPC_CLOSURE_SCHED(w->notify, error); 70 gpr_free(w); 71 } 72 GRPC_ERROR_UNREF(tracker->current_error); 73 gpr_free(tracker->name); 74 } 75 76 grpc_connectivity_state grpc_connectivity_state_check( 77 grpc_connectivity_state_tracker* tracker) { 78 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( 79 gpr_atm_no_barrier_load(&tracker->current_state_atm)); 80 if (grpc_connectivity_state_trace.enabled()) { 81 gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name, 82 grpc_connectivity_state_name(cur)); 83 } 84 return cur; 85 } 86 87 grpc_connectivity_state grpc_connectivity_state_get( 88 grpc_connectivity_state_tracker* tracker, grpc_error** error) { 89 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( 90 gpr_atm_no_barrier_load(&tracker->current_state_atm)); 91 if (grpc_connectivity_state_trace.enabled()) { 92 gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name, 93 grpc_connectivity_state_name(cur)); 94 } 95 if (error != nullptr) { 96 *error = GRPC_ERROR_REF(tracker->current_error); 97 } 98 return cur; 99 } 100 101 bool grpc_connectivity_state_has_watchers( 102 grpc_connectivity_state_tracker* connectivity_state) { 103 return connectivity_state->watchers != nullptr; 104 } 105 106 bool grpc_connectivity_state_notify_on_state_change( 107 grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current, 108 grpc_closure* notify) { 109 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( 110 gpr_atm_no_barrier_load(&tracker->current_state_atm)); 111 if (grpc_connectivity_state_trace.enabled()) { 112 if (current == nullptr) { 113 gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, 114 tracker->name, notify); 115 } else { 116 gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, 117 tracker->name, grpc_connectivity_state_name(*current), 118 grpc_connectivity_state_name(cur), notify); 119 } 120 } 121 if (current == nullptr) { 122 grpc_connectivity_state_watcher* w = tracker->watchers; 123 if (w != nullptr && w->notify == notify) { 124 GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED); 125 tracker->watchers = w->next; 126 gpr_free(w); 127 return false; 128 } 129 while (w != nullptr) { 130 grpc_connectivity_state_watcher* rm_candidate = w->next; 131 if (rm_candidate != nullptr && rm_candidate->notify == notify) { 132 GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED); 133 w->next = w->next->next; 134 gpr_free(rm_candidate); 135 return false; 136 } 137 w = w->next; 138 } 139 return false; 140 } else { 141 if (cur != *current) { 142 *current = cur; 143 GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error)); 144 } else { 145 grpc_connectivity_state_watcher* w = 146 static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w))); 147 w->current = current; 148 w->notify = notify; 149 w->next = tracker->watchers; 150 tracker->watchers = w; 151 } 152 return cur == GRPC_CHANNEL_IDLE; 153 } 154 } 155 156 void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker, 157 grpc_connectivity_state state, 158 grpc_error* error, const char* reason) { 159 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>( 160 gpr_atm_no_barrier_load(&tracker->current_state_atm)); 161 grpc_connectivity_state_watcher* w; 162 if (grpc_connectivity_state_trace.enabled()) { 163 const char* error_string = grpc_error_string(error); 164 gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker, 165 tracker->name, grpc_connectivity_state_name(cur), 166 grpc_connectivity_state_name(state), reason, error, error_string); 167 } 168 switch (state) { 169 case GRPC_CHANNEL_CONNECTING: 170 case GRPC_CHANNEL_IDLE: 171 case GRPC_CHANNEL_READY: 172 GPR_ASSERT(error == GRPC_ERROR_NONE); 173 break; 174 case GRPC_CHANNEL_SHUTDOWN: 175 case GRPC_CHANNEL_TRANSIENT_FAILURE: 176 GPR_ASSERT(error != GRPC_ERROR_NONE); 177 break; 178 } 179 GRPC_ERROR_UNREF(tracker->current_error); 180 tracker->current_error = error; 181 if (cur == state) { 182 return; 183 } 184 GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN); 185 gpr_atm_no_barrier_store(&tracker->current_state_atm, state); 186 while ((w = tracker->watchers) != nullptr) { 187 *w->current = state; 188 tracker->watchers = w->next; 189 if (grpc_connectivity_state_trace.enabled()) { 190 gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify); 191 } 192 GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error)); 193 gpr_free(w); 194 } 195 } 196