Home | History | Annotate | Download | only in channel
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include <string.h>
     22 
     23 #include <grpc/support/alloc.h>
     24 #include <grpc/support/log.h>
     25 #include <grpc/support/string_util.h>
     26 
     27 #include "src/core/lib/channel/channel_args.h"
     28 #include "src/core/lib/channel/handshaker.h"
     29 #include "src/core/lib/debug/trace.h"
     30 #include "src/core/lib/iomgr/timer.h"
     31 #include "src/core/lib/slice/slice_internal.h"
     32 
     33 grpc_core::TraceFlag grpc_handshaker_trace(false, "handshaker");
     34 
     35 //
     36 // grpc_handshaker
     37 //
     38 
     39 void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
     40                           grpc_handshaker* handshaker) {
     41   handshaker->vtable = vtable;
     42 }
     43 
     44 void grpc_handshaker_destroy(grpc_handshaker* handshaker) {
     45   handshaker->vtable->destroy(handshaker);
     46 }
     47 
     48 void grpc_handshaker_shutdown(grpc_handshaker* handshaker, grpc_error* why) {
     49   handshaker->vtable->shutdown(handshaker, why);
     50 }
     51 
     52 void grpc_handshaker_do_handshake(grpc_handshaker* handshaker,
     53                                   grpc_tcp_server_acceptor* acceptor,
     54                                   grpc_closure* on_handshake_done,
     55                                   grpc_handshaker_args* args) {
     56   handshaker->vtable->do_handshake(handshaker, acceptor, on_handshake_done,
     57                                    args);
     58 }
     59 
     60 const char* grpc_handshaker_name(grpc_handshaker* handshaker) {
     61   return handshaker->vtable->name;
     62 }
     63 
     64 //
     65 // grpc_handshake_manager
     66 //
     67 
     68 struct grpc_handshake_manager {
     69   gpr_mu mu;
     70   gpr_refcount refs;
     71   bool shutdown;
     72   // An array of handshakers added via grpc_handshake_manager_add().
     73   size_t count;
     74   grpc_handshaker** handshakers;
     75   // The index of the handshaker to invoke next and closure to invoke it.
     76   size_t index;
     77   grpc_closure call_next_handshaker;
     78   // The acceptor to call the handshakers with.
     79   grpc_tcp_server_acceptor* acceptor;
     80   // Deadline timer across all handshakers.
     81   grpc_timer deadline_timer;
     82   grpc_closure on_timeout;
     83   // The final callback and user_data to invoke after the last handshaker.
     84   grpc_closure on_handshake_done;
     85   void* user_data;
     86   // Handshaker args.
     87   grpc_handshaker_args args;
     88   // Links to the previous and next managers in a list of all pending handshakes
     89   // Used at server side only.
     90   grpc_handshake_manager* prev;
     91   grpc_handshake_manager* next;
     92 };
     93 
     94 grpc_handshake_manager* grpc_handshake_manager_create() {
     95   grpc_handshake_manager* mgr = static_cast<grpc_handshake_manager*>(
     96       gpr_zalloc(sizeof(grpc_handshake_manager)));
     97   gpr_mu_init(&mgr->mu);
     98   gpr_ref_init(&mgr->refs, 1);
     99   return mgr;
    100 }
    101 
    102 void grpc_handshake_manager_pending_list_add(grpc_handshake_manager** head,
    103                                              grpc_handshake_manager* mgr) {
    104   GPR_ASSERT(mgr->prev == nullptr);
    105   GPR_ASSERT(mgr->next == nullptr);
    106   mgr->next = *head;
    107   if (*head) {
    108     (*head)->prev = mgr;
    109   }
    110   *head = mgr;
    111 }
    112 
    113 void grpc_handshake_manager_pending_list_remove(grpc_handshake_manager** head,
    114                                                 grpc_handshake_manager* mgr) {
    115   if (mgr->next != nullptr) {
    116     mgr->next->prev = mgr->prev;
    117   }
    118   if (mgr->prev != nullptr) {
    119     mgr->prev->next = mgr->next;
    120   } else {
    121     GPR_ASSERT(*head == mgr);
    122     *head = mgr->next;
    123   }
    124 }
    125 
    126 void grpc_handshake_manager_pending_list_shutdown_all(
    127     grpc_handshake_manager* head, grpc_error* why) {
    128   while (head != nullptr) {
    129     grpc_handshake_manager_shutdown(head, GRPC_ERROR_REF(why));
    130     head = head->next;
    131   }
    132   GRPC_ERROR_UNREF(why);
    133 }
    134 
    135 static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; }
    136 
    137 void grpc_handshake_manager_add(grpc_handshake_manager* mgr,
    138                                 grpc_handshaker* handshaker) {
    139   if (grpc_handshaker_trace.enabled()) {
    140     gpr_log(
    141         GPR_INFO,
    142         "handshake_manager %p: adding handshaker %s [%p] at index %" PRIuPTR,
    143         mgr, grpc_handshaker_name(handshaker), handshaker, mgr->count);
    144   }
    145   gpr_mu_lock(&mgr->mu);
    146   // To avoid allocating memory for each handshaker we add, we double
    147   // the number of elements every time we need more.
    148   size_t realloc_count = 0;
    149   if (mgr->count == 0) {
    150     realloc_count = 2;
    151   } else if (mgr->count >= 2 && is_power_of_2(mgr->count)) {
    152     realloc_count = mgr->count * 2;
    153   }
    154   if (realloc_count > 0) {
    155     mgr->handshakers = static_cast<grpc_handshaker**>(gpr_realloc(
    156         mgr->handshakers, realloc_count * sizeof(grpc_handshaker*)));
    157   }
    158   mgr->handshakers[mgr->count++] = handshaker;
    159   gpr_mu_unlock(&mgr->mu);
    160 }
    161 
    162 static void grpc_handshake_manager_unref(grpc_handshake_manager* mgr) {
    163   if (gpr_unref(&mgr->refs)) {
    164     for (size_t i = 0; i < mgr->count; ++i) {
    165       grpc_handshaker_destroy(mgr->handshakers[i]);
    166     }
    167     gpr_free(mgr->handshakers);
    168     gpr_mu_destroy(&mgr->mu);
    169     gpr_free(mgr);
    170   }
    171 }
    172 
    173 void grpc_handshake_manager_destroy(grpc_handshake_manager* mgr) {
    174   grpc_handshake_manager_unref(mgr);
    175 }
    176 
    177 void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr,
    178                                      grpc_error* why) {
    179   gpr_mu_lock(&mgr->mu);
    180   // Shutdown the handshaker that's currently in progress, if any.
    181   if (!mgr->shutdown && mgr->index > 0) {
    182     mgr->shutdown = true;
    183     grpc_handshaker_shutdown(mgr->handshakers[mgr->index - 1],
    184                              GRPC_ERROR_REF(why));
    185   }
    186   gpr_mu_unlock(&mgr->mu);
    187   GRPC_ERROR_UNREF(why);
    188 }
    189 
    190 static char* handshaker_args_string(grpc_handshaker_args* args) {
    191   char* args_str = grpc_channel_args_string(args->args);
    192   size_t num_args = args->args != nullptr ? args->args->num_args : 0;
    193   size_t read_buffer_length =
    194       args->read_buffer != nullptr ? args->read_buffer->length : 0;
    195   char* str;
    196   gpr_asprintf(&str,
    197                "{endpoint=%p, args=%p {size=%" PRIuPTR
    198                ": %s}, read_buffer=%p (length=%" PRIuPTR "), exit_early=%d}",
    199                args->endpoint, args->args, num_args, args_str,
    200                args->read_buffer, read_buffer_length, args->exit_early);
    201   gpr_free(args_str);
    202   return str;
    203 }
    204 
    205 // Helper function to call either the next handshaker or the
    206 // on_handshake_done callback.
    207 // Returns true if we've scheduled the on_handshake_done callback.
    208 static bool call_next_handshaker_locked(grpc_handshake_manager* mgr,
    209                                         grpc_error* error) {
    210   if (grpc_handshaker_trace.enabled()) {
    211     char* args_str = handshaker_args_string(&mgr->args);
    212     gpr_log(GPR_INFO,
    213             "handshake_manager %p: error=%s shutdown=%d index=%" PRIuPTR
    214             ", args=%s",
    215             mgr, grpc_error_string(error), mgr->shutdown, mgr->index, args_str);
    216     gpr_free(args_str);
    217   }
    218   GPR_ASSERT(mgr->index <= mgr->count);
    219   // If we got an error or we've been shut down or we're exiting early or
    220   // we've finished the last handshaker, invoke the on_handshake_done
    221   // callback.  Otherwise, call the next handshaker.
    222   if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early ||
    223       mgr->index == mgr->count) {
    224     if (error == GRPC_ERROR_NONE && mgr->shutdown) {
    225       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown");
    226       // It is possible that the endpoint has already been destroyed by
    227       // a shutdown call while this callback was sitting on the ExecCtx
    228       // with no error.
    229       if (mgr->args.endpoint != nullptr) {
    230         // TODO(roth): It is currently necessary to shutdown endpoints
    231         // before destroying then, even when we know that there are no
    232         // pending read/write callbacks.  This should be fixed, at which
    233         // point this can be removed.
    234         grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error));
    235         grpc_endpoint_destroy(mgr->args.endpoint);
    236         mgr->args.endpoint = nullptr;
    237         grpc_channel_args_destroy(mgr->args.args);
    238         mgr->args.args = nullptr;
    239         grpc_slice_buffer_destroy_internal(mgr->args.read_buffer);
    240         gpr_free(mgr->args.read_buffer);
    241         mgr->args.read_buffer = nullptr;
    242       }
    243     }
    244     if (grpc_handshaker_trace.enabled()) {
    245       gpr_log(GPR_INFO,
    246               "handshake_manager %p: handshaking complete -- scheduling "
    247               "on_handshake_done with error=%s",
    248               mgr, grpc_error_string(error));
    249     }
    250     // Cancel deadline timer, since we're invoking the on_handshake_done
    251     // callback now.
    252     grpc_timer_cancel(&mgr->deadline_timer);
    253     GRPC_CLOSURE_SCHED(&mgr->on_handshake_done, error);
    254     mgr->shutdown = true;
    255   } else {
    256     if (grpc_handshaker_trace.enabled()) {
    257       gpr_log(
    258           GPR_INFO,
    259           "handshake_manager %p: calling handshaker %s [%p] at index %" PRIuPTR,
    260           mgr, grpc_handshaker_name(mgr->handshakers[mgr->index]),
    261           mgr->handshakers[mgr->index], mgr->index);
    262     }
    263     grpc_handshaker_do_handshake(mgr->handshakers[mgr->index], mgr->acceptor,
    264                                  &mgr->call_next_handshaker, &mgr->args);
    265   }
    266   ++mgr->index;
    267   return mgr->shutdown;
    268 }
    269 
    270 // A function used as the handshaker-done callback when chaining
    271 // handshakers together.
    272 static void call_next_handshaker(void* arg, grpc_error* error) {
    273   grpc_handshake_manager* mgr = static_cast<grpc_handshake_manager*>(arg);
    274   gpr_mu_lock(&mgr->mu);
    275   bool done = call_next_handshaker_locked(mgr, GRPC_ERROR_REF(error));
    276   gpr_mu_unlock(&mgr->mu);
    277   // If we're invoked the final callback, we won't be coming back
    278   // to this function, so we can release our reference to the
    279   // handshake manager.
    280   if (done) {
    281     grpc_handshake_manager_unref(mgr);
    282   }
    283 }
    284 
    285 // Callback invoked when deadline is exceeded.
    286 static void on_timeout(void* arg, grpc_error* error) {
    287   grpc_handshake_manager* mgr = static_cast<grpc_handshake_manager*>(arg);
    288   if (error == GRPC_ERROR_NONE) {  // Timer fired, rather than being cancelled.
    289     grpc_handshake_manager_shutdown(
    290         mgr, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake timed out"));
    291   }
    292   grpc_handshake_manager_unref(mgr);
    293 }
    294 
    295 void grpc_handshake_manager_do_handshake(
    296     grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties,
    297     grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
    298     grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
    299     grpc_iomgr_cb_func on_handshake_done, void* user_data) {
    300   gpr_mu_lock(&mgr->mu);
    301   GPR_ASSERT(mgr->index == 0);
    302   GPR_ASSERT(!mgr->shutdown);
    303   // Construct handshaker args.  These will be passed through all
    304   // handshakers and eventually be freed by the on_handshake_done callback.
    305   mgr->args.interested_parties = interested_parties;
    306   mgr->args.endpoint = endpoint;
    307   mgr->args.args = grpc_channel_args_copy(channel_args);
    308   mgr->args.user_data = user_data;
    309   mgr->args.read_buffer = static_cast<grpc_slice_buffer*>(
    310       gpr_malloc(sizeof(*mgr->args.read_buffer)));
    311   grpc_slice_buffer_init(mgr->args.read_buffer);
    312   // Initialize state needed for calling handshakers.
    313   mgr->acceptor = acceptor;
    314   GRPC_CLOSURE_INIT(&mgr->call_next_handshaker, call_next_handshaker, mgr,
    315                     grpc_schedule_on_exec_ctx);
    316   GRPC_CLOSURE_INIT(&mgr->on_handshake_done, on_handshake_done, &mgr->args,
    317                     grpc_schedule_on_exec_ctx);
    318   // Start deadline timer, which owns a ref.
    319   gpr_ref(&mgr->refs);
    320   GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr,
    321                     grpc_schedule_on_exec_ctx);
    322   grpc_timer_init(&mgr->deadline_timer, deadline, &mgr->on_timeout);
    323   // Start first handshaker, which also owns a ref.
    324   gpr_ref(&mgr->refs);
    325   bool done = call_next_handshaker_locked(mgr, GRPC_ERROR_NONE);
    326   gpr_mu_unlock(&mgr->mu);
    327   if (done) {
    328     grpc_handshake_manager_unref(mgr);
    329   }
    330 }
    331