Home | History | Annotate | Download | only in client_channel
      1 //
      2 //
      3 // Copyright 2016 gRPC authors.
      4 //
      5 // Licensed under the Apache License, Version 2.0 (the "License");
      6 // you may not use this file except in compliance with the License.
      7 // You may obtain a copy of the License at
      8 //
      9 //     http://www.apache.org/licenses/LICENSE-2.0
     10 //
     11 // Unless required by applicable law or agreed to in writing, software
     12 // distributed under the License is distributed on an "AS IS" BASIS,
     13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 // See the License for the specific language governing permissions and
     15 // limitations under the License.
     16 //
     17 //
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/ext/filters/client_channel/subchannel_index.h"
     22 
     23 #include <stdbool.h>
     24 #include <string.h>
     25 
     26 #include <grpc/support/alloc.h>
     27 #include <grpc/support/string_util.h>
     28 
     29 #include "src/core/lib/avl/avl.h"
     30 #include "src/core/lib/channel/channel_args.h"
     31 #include "src/core/lib/gpr/tls.h"
     32 
     33 // a map of subchannel_key --> subchannel, used for detecting connections
     34 // to the same destination in order to share them
     35 static grpc_avl g_subchannel_index;
     36 
     37 static gpr_mu g_mu;
     38 
     39 static gpr_refcount g_refcount;
     40 
     41 struct grpc_subchannel_key {
     42   grpc_subchannel_args args;
     43 };
     44 
     45 static bool g_force_creation = false;
     46 
     47 static grpc_subchannel_key* create_key(
     48     const grpc_subchannel_args* args,
     49     grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) {
     50   grpc_subchannel_key* k =
     51       static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k)));
     52   k->args.filter_count = args->filter_count;
     53   if (k->args.filter_count > 0) {
     54     k->args.filters = static_cast<const grpc_channel_filter**>(
     55         gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count));
     56     memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters),
     57            args->filters, sizeof(*k->args.filters) * k->args.filter_count);
     58   } else {
     59     k->args.filters = nullptr;
     60   }
     61   k->args.args = copy_channel_args(args->args);
     62   return k;
     63 }
     64 
     65 grpc_subchannel_key* grpc_subchannel_key_create(
     66     const grpc_subchannel_args* args) {
     67   return create_key(args, grpc_channel_args_normalize);
     68 }
     69 
     70 static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
     71   return create_key(&k->args, grpc_channel_args_copy);
     72 }
     73 
     74 int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
     75                                 const grpc_subchannel_key* b) {
     76   // To pretend the keys are different, return a non-zero value.
     77   if (GPR_UNLIKELY(g_force_creation)) return 1;
     78   int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
     79   if (c != 0) return c;
     80   if (a->args.filter_count > 0) {
     81     c = memcmp(a->args.filters, b->args.filters,
     82                a->args.filter_count * sizeof(*a->args.filters));
     83     if (c != 0) return c;
     84   }
     85   return grpc_channel_args_compare(a->args.args, b->args.args);
     86 }
     87 
     88 void grpc_subchannel_key_destroy(grpc_subchannel_key* k) {
     89   gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters));
     90   grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args));
     91   gpr_free(k);
     92 }
     93 
     94 static void sck_avl_destroy(void* p, void* user_data) {
     95   grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p));
     96 }
     97 
     98 static void* sck_avl_copy(void* p, void* unused) {
     99   return subchannel_key_copy(static_cast<grpc_subchannel_key*>(p));
    100 }
    101 
    102 static long sck_avl_compare(void* a, void* b, void* unused) {
    103   return grpc_subchannel_key_compare(static_cast<grpc_subchannel_key*>(a),
    104                                      static_cast<grpc_subchannel_key*>(b));
    105 }
    106 
    107 static void scv_avl_destroy(void* p, void* user_data) {
    108   GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index");
    109 }
    110 
    111 static void* scv_avl_copy(void* p, void* unused) {
    112   GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "subchannel_index");
    113   return p;
    114 }
    115 
    116 static const grpc_avl_vtable subchannel_avl_vtable = {
    117     sck_avl_destroy,  // destroy_key
    118     sck_avl_copy,     // copy_key
    119     sck_avl_compare,  // compare_keys
    120     scv_avl_destroy,  // destroy_value
    121     scv_avl_copy      // copy_value
    122 };
    123 
    124 void grpc_subchannel_index_init(void) {
    125   g_subchannel_index = grpc_avl_create(&subchannel_avl_vtable);
    126   gpr_mu_init(&g_mu);
    127   gpr_ref_init(&g_refcount, 1);
    128 }
    129 
    130 void grpc_subchannel_index_shutdown(void) {
    131   // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
    132   // To solve that, we should force polling to flush any pending callbacks, then
    133   // shutdown safely.
    134   grpc_subchannel_index_unref();
    135 }
    136 
    137 void grpc_subchannel_index_unref(void) {
    138   if (gpr_unref(&g_refcount)) {
    139     gpr_mu_destroy(&g_mu);
    140     grpc_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get());
    141   }
    142 }
    143 
    144 void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
    145 
    146 grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) {
    147   // Lock, and take a reference to the subchannel index.
    148   // We don't need to do the search under a lock as avl's are immutable.
    149   gpr_mu_lock(&g_mu);
    150   grpc_avl index = grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
    151   gpr_mu_unlock(&g_mu);
    152 
    153   grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
    154       (grpc_subchannel*)grpc_avl_get(index, key, grpc_core::ExecCtx::Get()),
    155       "index_find");
    156   grpc_avl_unref(index, grpc_core::ExecCtx::Get());
    157 
    158   return c;
    159 }
    160 
    161 grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key,
    162                                                 grpc_subchannel* constructed) {
    163   grpc_subchannel* c = nullptr;
    164   bool need_to_unref_constructed = false;
    165 
    166   while (c == nullptr) {
    167     need_to_unref_constructed = false;
    168 
    169     // Compare and swap loop:
    170     // - take a reference to the current index
    171     gpr_mu_lock(&g_mu);
    172     grpc_avl index =
    173         grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
    174     gpr_mu_unlock(&g_mu);
    175 
    176     // - Check to see if a subchannel already exists
    177     c = static_cast<grpc_subchannel*>(
    178         grpc_avl_get(index, key, grpc_core::ExecCtx::Get()));
    179     if (c != nullptr) {
    180       c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register");
    181     }
    182     if (c != nullptr) {
    183       // yes -> we're done
    184       need_to_unref_constructed = true;
    185     } else {
    186       // no -> update the avl and compare/swap
    187       grpc_avl updated =
    188           grpc_avl_add(grpc_avl_ref(index, grpc_core::ExecCtx::Get()),
    189                        subchannel_key_copy(key),
    190                        GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"),
    191                        grpc_core::ExecCtx::Get());
    192 
    193       // it may happen (but it's expected to be unlikely)
    194       // that some other thread has changed the index:
    195       // compare/swap here to check that, and retry as necessary
    196       gpr_mu_lock(&g_mu);
    197       if (index.root == g_subchannel_index.root) {
    198         GPR_SWAP(grpc_avl, updated, g_subchannel_index);
    199         c = constructed;
    200       }
    201       gpr_mu_unlock(&g_mu);
    202 
    203       grpc_avl_unref(updated, grpc_core::ExecCtx::Get());
    204     }
    205     grpc_avl_unref(index, grpc_core::ExecCtx::Get());
    206   }
    207 
    208   if (need_to_unref_constructed) {
    209     GRPC_SUBCHANNEL_UNREF(constructed, "index_register");
    210   }
    211 
    212   return c;
    213 }
    214 
    215 void grpc_subchannel_index_unregister(grpc_subchannel_key* key,
    216                                       grpc_subchannel* constructed) {
    217   bool done = false;
    218   while (!done) {
    219     // Compare and swap loop:
    220     // - take a reference to the current index
    221     gpr_mu_lock(&g_mu);
    222     grpc_avl index =
    223         grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
    224     gpr_mu_unlock(&g_mu);
    225 
    226     // Check to see if this key still refers to the previously
    227     // registered subchannel
    228     grpc_subchannel* c = static_cast<grpc_subchannel*>(
    229         grpc_avl_get(index, key, grpc_core::ExecCtx::Get()));
    230     if (c != constructed) {
    231       grpc_avl_unref(index, grpc_core::ExecCtx::Get());
    232       break;
    233     }
    234 
    235     // compare and swap the update (some other thread may have
    236     // mutated the index behind us)
    237     grpc_avl updated =
    238         grpc_avl_remove(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), key,
    239                         grpc_core::ExecCtx::Get());
    240 
    241     gpr_mu_lock(&g_mu);
    242     if (index.root == g_subchannel_index.root) {
    243       GPR_SWAP(grpc_avl, updated, g_subchannel_index);
    244       done = true;
    245     }
    246     gpr_mu_unlock(&g_mu);
    247 
    248     grpc_avl_unref(updated, grpc_core::ExecCtx::Get());
    249     grpc_avl_unref(index, grpc_core::ExecCtx::Get());
    250   }
    251 }
    252 
    253 void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
    254   g_force_creation = force_creation;
    255 }
    256