Home | History | Annotate | Download | only in transport
      1 /*
      2  *
      3  * Copyright 2015 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/transport/connectivity_state.h"
     22 
     23 #include <string.h>
     24 
     25 #include <grpc/support/alloc.h>
     26 #include <grpc/support/log.h>
     27 #include <grpc/support/string_util.h>
     28 
     29 grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
     30 
     31 const char* grpc_connectivity_state_name(grpc_connectivity_state state) {
     32   switch (state) {
     33     case GRPC_CHANNEL_IDLE:
     34       return "IDLE";
     35     case GRPC_CHANNEL_CONNECTING:
     36       return "CONNECTING";
     37     case GRPC_CHANNEL_READY:
     38       return "READY";
     39     case GRPC_CHANNEL_TRANSIENT_FAILURE:
     40       return "TRANSIENT_FAILURE";
     41     case GRPC_CHANNEL_SHUTDOWN:
     42       return "SHUTDOWN";
     43   }
     44   GPR_UNREACHABLE_CODE(return "UNKNOWN");
     45 }
     46 
     47 void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
     48                                   grpc_connectivity_state init_state,
     49                                   const char* name) {
     50   gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
     51   tracker->current_error = GRPC_ERROR_NONE;
     52   tracker->watchers = nullptr;
     53   tracker->name = gpr_strdup(name);
     54 }
     55 
     56 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
     57   grpc_error* error;
     58   grpc_connectivity_state_watcher* w;
     59   while ((w = tracker->watchers)) {
     60     tracker->watchers = w->next;
     61 
     62     if (GRPC_CHANNEL_SHUTDOWN != *w->current) {
     63       *w->current = GRPC_CHANNEL_SHUTDOWN;
     64       error = GRPC_ERROR_NONE;
     65     } else {
     66       error =
     67           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
     68     }
     69     GRPC_CLOSURE_SCHED(w->notify, error);
     70     gpr_free(w);
     71   }
     72   GRPC_ERROR_UNREF(tracker->current_error);
     73   gpr_free(tracker->name);
     74 }
     75 
     76 grpc_connectivity_state grpc_connectivity_state_check(
     77     grpc_connectivity_state_tracker* tracker) {
     78   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
     79       gpr_atm_no_barrier_load(&tracker->current_state_atm));
     80   if (grpc_connectivity_state_trace.enabled()) {
     81     gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
     82             grpc_connectivity_state_name(cur));
     83   }
     84   return cur;
     85 }
     86 
     87 grpc_connectivity_state grpc_connectivity_state_get(
     88     grpc_connectivity_state_tracker* tracker, grpc_error** error) {
     89   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
     90       gpr_atm_no_barrier_load(&tracker->current_state_atm));
     91   if (grpc_connectivity_state_trace.enabled()) {
     92     gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
     93             grpc_connectivity_state_name(cur));
     94   }
     95   if (error != nullptr) {
     96     *error = GRPC_ERROR_REF(tracker->current_error);
     97   }
     98   return cur;
     99 }
    100 
    101 bool grpc_connectivity_state_has_watchers(
    102     grpc_connectivity_state_tracker* connectivity_state) {
    103   return connectivity_state->watchers != nullptr;
    104 }
    105 
    106 bool grpc_connectivity_state_notify_on_state_change(
    107     grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
    108     grpc_closure* notify) {
    109   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
    110       gpr_atm_no_barrier_load(&tracker->current_state_atm));
    111   if (grpc_connectivity_state_trace.enabled()) {
    112     if (current == nullptr) {
    113       gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
    114               tracker->name, notify);
    115     } else {
    116       gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
    117               tracker->name, grpc_connectivity_state_name(*current),
    118               grpc_connectivity_state_name(cur), notify);
    119     }
    120   }
    121   if (current == nullptr) {
    122     grpc_connectivity_state_watcher* w = tracker->watchers;
    123     if (w != nullptr && w->notify == notify) {
    124       GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
    125       tracker->watchers = w->next;
    126       gpr_free(w);
    127       return false;
    128     }
    129     while (w != nullptr) {
    130       grpc_connectivity_state_watcher* rm_candidate = w->next;
    131       if (rm_candidate != nullptr && rm_candidate->notify == notify) {
    132         GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
    133         w->next = w->next->next;
    134         gpr_free(rm_candidate);
    135         return false;
    136       }
    137       w = w->next;
    138     }
    139     return false;
    140   } else {
    141     if (cur != *current) {
    142       *current = cur;
    143       GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error));
    144     } else {
    145       grpc_connectivity_state_watcher* w =
    146           static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w)));
    147       w->current = current;
    148       w->notify = notify;
    149       w->next = tracker->watchers;
    150       tracker->watchers = w;
    151     }
    152     return cur == GRPC_CHANNEL_IDLE;
    153   }
    154 }
    155 
    156 void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
    157                                  grpc_connectivity_state state,
    158                                  grpc_error* error, const char* reason) {
    159   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
    160       gpr_atm_no_barrier_load(&tracker->current_state_atm));
    161   grpc_connectivity_state_watcher* w;
    162   if (grpc_connectivity_state_trace.enabled()) {
    163     const char* error_string = grpc_error_string(error);
    164     gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
    165             tracker->name, grpc_connectivity_state_name(cur),
    166             grpc_connectivity_state_name(state), reason, error, error_string);
    167   }
    168   switch (state) {
    169     case GRPC_CHANNEL_CONNECTING:
    170     case GRPC_CHANNEL_IDLE:
    171     case GRPC_CHANNEL_READY:
    172       GPR_ASSERT(error == GRPC_ERROR_NONE);
    173       break;
    174     case GRPC_CHANNEL_SHUTDOWN:
    175     case GRPC_CHANNEL_TRANSIENT_FAILURE:
    176       GPR_ASSERT(error != GRPC_ERROR_NONE);
    177       break;
    178   }
    179   GRPC_ERROR_UNREF(tracker->current_error);
    180   tracker->current_error = error;
    181   if (cur == state) {
    182     return;
    183   }
    184   GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
    185   gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
    186   while ((w = tracker->watchers) != nullptr) {
    187     *w->current = state;
    188     tracker->watchers = w->next;
    189     if (grpc_connectivity_state_trace.enabled()) {
    190       gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
    191     }
    192     GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error));
    193     gpr_free(w);
    194   }
    195 }
    196