1 // 2 // 3 // Copyright 2016 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/ext/filters/client_channel/subchannel_index.h" 22 23 #include <stdbool.h> 24 #include <string.h> 25 26 #include <grpc/support/alloc.h> 27 #include <grpc/support/string_util.h> 28 29 #include "src/core/lib/avl/avl.h" 30 #include "src/core/lib/channel/channel_args.h" 31 #include "src/core/lib/gpr/tls.h" 32 33 // a map of subchannel_key --> subchannel, used for detecting connections 34 // to the same destination in order to share them 35 static grpc_avl g_subchannel_index; 36 37 static gpr_mu g_mu; 38 39 static gpr_refcount g_refcount; 40 41 struct grpc_subchannel_key { 42 grpc_subchannel_args args; 43 }; 44 45 static bool g_force_creation = false; 46 47 static grpc_subchannel_key* create_key( 48 const grpc_subchannel_args* args, 49 grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) { 50 grpc_subchannel_key* k = 51 static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k))); 52 k->args.filter_count = args->filter_count; 53 if (k->args.filter_count > 0) { 54 k->args.filters = static_cast<const grpc_channel_filter**>( 55 gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count)); 56 memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters), 57 args->filters, sizeof(*k->args.filters) * k->args.filter_count); 58 } else { 59 k->args.filters = nullptr; 60 } 61 k->args.args = copy_channel_args(args->args); 62 return k; 63 } 64 65 grpc_subchannel_key* grpc_subchannel_key_create( 66 const grpc_subchannel_args* args) { 67 return create_key(args, grpc_channel_args_normalize); 68 } 69 70 static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) { 71 return create_key(&k->args, grpc_channel_args_copy); 72 } 73 74 int grpc_subchannel_key_compare(const grpc_subchannel_key* a, 75 const grpc_subchannel_key* b) { 76 // To pretend the keys are different, return a non-zero value. 77 if (GPR_UNLIKELY(g_force_creation)) return 1; 78 int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); 79 if (c != 0) return c; 80 if (a->args.filter_count > 0) { 81 c = memcmp(a->args.filters, b->args.filters, 82 a->args.filter_count * sizeof(*a->args.filters)); 83 if (c != 0) return c; 84 } 85 return grpc_channel_args_compare(a->args.args, b->args.args); 86 } 87 88 void grpc_subchannel_key_destroy(grpc_subchannel_key* k) { 89 gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters)); 90 grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args)); 91 gpr_free(k); 92 } 93 94 static void sck_avl_destroy(void* p, void* user_data) { 95 grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p)); 96 } 97 98 static void* sck_avl_copy(void* p, void* unused) { 99 return subchannel_key_copy(static_cast<grpc_subchannel_key*>(p)); 100 } 101 102 static long sck_avl_compare(void* a, void* b, void* unused) { 103 return grpc_subchannel_key_compare(static_cast<grpc_subchannel_key*>(a), 104 static_cast<grpc_subchannel_key*>(b)); 105 } 106 107 static void scv_avl_destroy(void* p, void* user_data) { 108 GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index"); 109 } 110 111 static void* scv_avl_copy(void* p, void* unused) { 112 GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "subchannel_index"); 113 return p; 114 } 115 116 static const grpc_avl_vtable subchannel_avl_vtable = { 117 sck_avl_destroy, // destroy_key 118 sck_avl_copy, // copy_key 119 sck_avl_compare, // compare_keys 120 scv_avl_destroy, // destroy_value 121 scv_avl_copy // copy_value 122 }; 123 124 void grpc_subchannel_index_init(void) { 125 g_subchannel_index = grpc_avl_create(&subchannel_avl_vtable); 126 gpr_mu_init(&g_mu); 127 gpr_ref_init(&g_refcount, 1); 128 } 129 130 void grpc_subchannel_index_shutdown(void) { 131 // TODO(juanlishen): This refcounting mechanism may lead to memory leackage. 132 // To solve that, we should force polling to flush any pending callbacks, then 133 // shutdown safely. 134 grpc_subchannel_index_unref(); 135 } 136 137 void grpc_subchannel_index_unref(void) { 138 if (gpr_unref(&g_refcount)) { 139 gpr_mu_destroy(&g_mu); 140 grpc_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get()); 141 } 142 } 143 144 void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); } 145 146 grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) { 147 // Lock, and take a reference to the subchannel index. 148 // We don't need to do the search under a lock as avl's are immutable. 149 gpr_mu_lock(&g_mu); 150 grpc_avl index = grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); 151 gpr_mu_unlock(&g_mu); 152 153 grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( 154 (grpc_subchannel*)grpc_avl_get(index, key, grpc_core::ExecCtx::Get()), 155 "index_find"); 156 grpc_avl_unref(index, grpc_core::ExecCtx::Get()); 157 158 return c; 159 } 160 161 grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, 162 grpc_subchannel* constructed) { 163 grpc_subchannel* c = nullptr; 164 bool need_to_unref_constructed = false; 165 166 while (c == nullptr) { 167 need_to_unref_constructed = false; 168 169 // Compare and swap loop: 170 // - take a reference to the current index 171 gpr_mu_lock(&g_mu); 172 grpc_avl index = 173 grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); 174 gpr_mu_unlock(&g_mu); 175 176 // - Check to see if a subchannel already exists 177 c = static_cast<grpc_subchannel*>( 178 grpc_avl_get(index, key, grpc_core::ExecCtx::Get())); 179 if (c != nullptr) { 180 c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); 181 } 182 if (c != nullptr) { 183 // yes -> we're done 184 need_to_unref_constructed = true; 185 } else { 186 // no -> update the avl and compare/swap 187 grpc_avl updated = 188 grpc_avl_add(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), 189 subchannel_key_copy(key), 190 GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), 191 grpc_core::ExecCtx::Get()); 192 193 // it may happen (but it's expected to be unlikely) 194 // that some other thread has changed the index: 195 // compare/swap here to check that, and retry as necessary 196 gpr_mu_lock(&g_mu); 197 if (index.root == g_subchannel_index.root) { 198 GPR_SWAP(grpc_avl, updated, g_subchannel_index); 199 c = constructed; 200 } 201 gpr_mu_unlock(&g_mu); 202 203 grpc_avl_unref(updated, grpc_core::ExecCtx::Get()); 204 } 205 grpc_avl_unref(index, grpc_core::ExecCtx::Get()); 206 } 207 208 if (need_to_unref_constructed) { 209 GRPC_SUBCHANNEL_UNREF(constructed, "index_register"); 210 } 211 212 return c; 213 } 214 215 void grpc_subchannel_index_unregister(grpc_subchannel_key* key, 216 grpc_subchannel* constructed) { 217 bool done = false; 218 while (!done) { 219 // Compare and swap loop: 220 // - take a reference to the current index 221 gpr_mu_lock(&g_mu); 222 grpc_avl index = 223 grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); 224 gpr_mu_unlock(&g_mu); 225 226 // Check to see if this key still refers to the previously 227 // registered subchannel 228 grpc_subchannel* c = static_cast<grpc_subchannel*>( 229 grpc_avl_get(index, key, grpc_core::ExecCtx::Get())); 230 if (c != constructed) { 231 grpc_avl_unref(index, grpc_core::ExecCtx::Get()); 232 break; 233 } 234 235 // compare and swap the update (some other thread may have 236 // mutated the index behind us) 237 grpc_avl updated = 238 grpc_avl_remove(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), key, 239 grpc_core::ExecCtx::Get()); 240 241 gpr_mu_lock(&g_mu); 242 if (index.root == g_subchannel_index.root) { 243 GPR_SWAP(grpc_avl, updated, g_subchannel_index); 244 done = true; 245 } 246 gpr_mu_unlock(&g_mu); 247 248 grpc_avl_unref(updated, grpc_core::ExecCtx::Get()); 249 grpc_avl_unref(index, grpc_core::ExecCtx::Get()); 250 } 251 } 252 253 void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { 254 g_force_creation = force_creation; 255 } 256