Home | History | Annotate | Download | only in surface
      1 /*
      2  *
      3  * Copyright 2015-2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/surface/server.h"
     22 
     23 #include <limits.h>
     24 #include <stdlib.h>
     25 #include <string.h>
     26 
     27 #include <grpc/support/alloc.h>
     28 #include <grpc/support/log.h>
     29 #include <grpc/support/string_util.h>
     30 
     31 #include "src/core/lib/channel/channel_args.h"
     32 #include "src/core/lib/channel/connected_channel.h"
     33 #include "src/core/lib/debug/stats.h"
     34 #include "src/core/lib/gpr/mpscq.h"
     35 #include "src/core/lib/gpr/spinlock.h"
     36 #include "src/core/lib/gpr/string.h"
     37 #include "src/core/lib/iomgr/executor.h"
     38 #include "src/core/lib/iomgr/iomgr.h"
     39 #include "src/core/lib/slice/slice_internal.h"
     40 #include "src/core/lib/surface/api_trace.h"
     41 #include "src/core/lib/surface/call.h"
     42 #include "src/core/lib/surface/channel.h"
     43 #include "src/core/lib/surface/completion_queue.h"
     44 #include "src/core/lib/surface/init.h"
     45 #include "src/core/lib/transport/metadata.h"
     46 #include "src/core/lib/transport/static_metadata.h"
     47 
     48 grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
     49 
     50 namespace {
     51 struct listener {
     52   void* arg;
     53   void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
     54                 size_t pollset_count);
     55   void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
     56   struct listener* next;
     57   grpc_closure destroy_done;
     58 };
     59 
     60 enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
     61 
     62 struct registered_method;
     63 
     64 struct requested_call {
     65   gpr_mpscq_node request_link; /* must be first */
     66   requested_call_type type;
     67   size_t cq_idx;
     68   void* tag;
     69   grpc_server* server;
     70   grpc_completion_queue* cq_bound_to_call;
     71   grpc_call** call;
     72   grpc_cq_completion completion;
     73   grpc_metadata_array* initial_metadata;
     74   union {
     75     struct {
     76       grpc_call_details* details;
     77     } batch;
     78     struct {
     79       registered_method* method;
     80       gpr_timespec* deadline;
     81       grpc_byte_buffer** optional_payload;
     82     } registered;
     83   } data;
     84 };
     85 
     86 struct channel_registered_method {
     87   registered_method* server_registered_method;
     88   uint32_t flags;
     89   bool has_host;
     90   grpc_slice method;
     91   grpc_slice host;
     92 };
     93 
     94 struct channel_data {
     95   grpc_server* server;
     96   grpc_connectivity_state connectivity_state;
     97   grpc_channel* channel;
     98   size_t cq_idx;
     99   /* linked list of all channels on a server */
    100   channel_data* next;
    101   channel_data* prev;
    102   channel_registered_method* registered_methods;
    103   uint32_t registered_method_slots;
    104   uint32_t registered_method_max_probes;
    105   grpc_closure finish_destroy_channel_closure;
    106   grpc_closure channel_connectivity_changed;
    107 };
    108 
    109 typedef struct shutdown_tag {
    110   void* tag;
    111   grpc_completion_queue* cq;
    112   grpc_cq_completion completion;
    113 } shutdown_tag;
    114 
    115 typedef enum {
    116   /* waiting for metadata */
    117   NOT_STARTED,
    118   /* inital metadata read, not flow controlled in yet */
    119   PENDING,
    120   /* flow controlled in, on completion queue */
    121   ACTIVATED,
    122   /* cancelled before being queued */
    123   ZOMBIED
    124 } call_state;
    125 
    126 typedef struct request_matcher request_matcher;
    127 
    128 struct call_data {
    129   grpc_call* call;
    130 
    131   gpr_atm state;
    132 
    133   bool path_set;
    134   bool host_set;
    135   grpc_slice path;
    136   grpc_slice host;
    137   grpc_millis deadline;
    138 
    139   grpc_completion_queue* cq_new;
    140 
    141   grpc_metadata_batch* recv_initial_metadata;
    142   uint32_t recv_initial_metadata_flags;
    143   grpc_metadata_array initial_metadata;
    144 
    145   request_matcher* matcher;
    146   grpc_byte_buffer* payload;
    147 
    148   grpc_closure got_initial_metadata;
    149   grpc_closure server_on_recv_initial_metadata;
    150   grpc_closure kill_zombie_closure;
    151   grpc_closure* on_done_recv_initial_metadata;
    152   grpc_closure recv_trailing_metadata_ready;
    153   grpc_error* error;
    154   grpc_closure* original_recv_trailing_metadata_ready;
    155 
    156   grpc_closure publish;
    157 
    158   call_data* pending_next;
    159 };
    160 
    161 struct request_matcher {
    162   grpc_server* server;
    163   call_data* pending_head;
    164   call_data* pending_tail;
    165   gpr_locked_mpscq* requests_per_cq;
    166 };
    167 
    168 struct registered_method {
    169   char* method;
    170   char* host;
    171   grpc_server_register_method_payload_handling payload_handling;
    172   uint32_t flags;
    173   /* one request matcher per method */
    174   request_matcher matcher;
    175   registered_method* next;
    176 };
    177 
    178 typedef struct {
    179   grpc_channel** channels;
    180   size_t num_channels;
    181 } channel_broadcaster;
    182 }  // namespace
    183 
    184 struct grpc_server {
    185   grpc_channel_args* channel_args;
    186 
    187   grpc_completion_queue** cqs;
    188   grpc_pollset** pollsets;
    189   size_t cq_count;
    190   size_t pollset_count;
    191   bool started;
    192 
    193   /* The two following mutexes control access to server-state
    194      mu_global controls access to non-call-related state (e.g., channel state)
    195      mu_call controls access to call-related state (e.g., the call lists)
    196 
    197      If they are ever required to be nested, you must lock mu_global
    198      before mu_call. This is currently used in shutdown processing
    199      (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
    200   gpr_mu mu_global; /* mutex for server and channel state */
    201   gpr_mu mu_call;   /* mutex for call-specific state */
    202 
    203   /* startup synchronization: flag is protected by mu_global, signals whether
    204      we are doing the listener start routine or not */
    205   bool starting;
    206   gpr_cv starting_cv;
    207 
    208   registered_method* registered_methods;
    209   /** one request matcher for unregistered methods */
    210   request_matcher unregistered_request_matcher;
    211 
    212   gpr_atm shutdown_flag;
    213   uint8_t shutdown_published;
    214   size_t num_shutdown_tags;
    215   shutdown_tag* shutdown_tags;
    216 
    217   channel_data root_channel_data;
    218 
    219   listener* listeners;
    220   int listeners_destroyed;
    221   gpr_refcount internal_refcount;
    222 
    223   /** when did we print the last shutdown progress message */
    224   gpr_timespec last_shutdown_message_time;
    225 
    226   grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
    227 };
    228 
    229 #define SERVER_FROM_CALL_ELEM(elem) \
    230   (((channel_data*)(elem)->channel_data)->server)
    231 
    232 static void publish_new_rpc(void* calld, grpc_error* error);
    233 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
    234                       grpc_error* error);
    235 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
    236    hold mu_call */
    237 static void maybe_finish_shutdown(grpc_server* server);
    238 
    239 /*
    240  * channel broadcaster
    241  */
    242 
    243 /* assumes server locked */
    244 static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
    245   channel_data* c;
    246   size_t count = 0;
    247   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
    248     count++;
    249   }
    250   cb->num_channels = count;
    251   cb->channels = static_cast<grpc_channel**>(
    252       gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
    253   count = 0;
    254   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
    255     cb->channels[count++] = c->channel;
    256     GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
    257   }
    258 }
    259 
    260 struct shutdown_cleanup_args {
    261   grpc_closure closure;
    262   grpc_slice slice;
    263 };
    264 
    265 static void shutdown_cleanup(void* arg, grpc_error* error) {
    266   struct shutdown_cleanup_args* a =
    267       static_cast<struct shutdown_cleanup_args*>(arg);
    268   grpc_slice_unref_internal(a->slice);
    269   gpr_free(a);
    270 }
    271 
    272 static void send_shutdown(grpc_channel* channel, bool send_goaway,
    273                           grpc_error* send_disconnect) {
    274   struct shutdown_cleanup_args* sc =
    275       static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
    276   GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
    277                     grpc_schedule_on_exec_ctx);
    278   grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
    279   grpc_channel_element* elem;
    280 
    281   op->goaway_error =
    282       send_goaway ? grpc_error_set_int(
    283                         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
    284                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
    285                   : GRPC_ERROR_NONE;
    286   op->set_accept_stream = true;
    287   sc->slice = grpc_slice_from_copied_string("Server shutdown");
    288   op->disconnect_with_error = send_disconnect;
    289 
    290   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
    291   elem->filter->start_transport_op(elem, op);
    292 }
    293 
    294 static void channel_broadcaster_shutdown(channel_broadcaster* cb,
    295                                          bool send_goaway,
    296                                          grpc_error* force_disconnect) {
    297   size_t i;
    298 
    299   for (i = 0; i < cb->num_channels; i++) {
    300     send_shutdown(cb->channels[i], send_goaway,
    301                   GRPC_ERROR_REF(force_disconnect));
    302     GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
    303   }
    304   gpr_free(cb->channels);
    305   GRPC_ERROR_UNREF(force_disconnect);
    306 }
    307 
    308 /*
    309  * request_matcher
    310  */
    311 
    312 static void request_matcher_init(request_matcher* rm, grpc_server* server) {
    313   memset(rm, 0, sizeof(*rm));
    314   rm->server = server;
    315   rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
    316       gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
    317   for (size_t i = 0; i < server->cq_count; i++) {
    318     gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
    319   }
    320 }
    321 
    322 static void request_matcher_destroy(request_matcher* rm) {
    323   for (size_t i = 0; i < rm->server->cq_count; i++) {
    324     GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
    325     gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
    326   }
    327   gpr_free(rm->requests_per_cq);
    328 }
    329 
    330 static void kill_zombie(void* elem, grpc_error* error) {
    331   grpc_call_unref(
    332       grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
    333 }
    334 
    335 static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
    336   while (rm->pending_head) {
    337     call_data* calld = rm->pending_head;
    338     rm->pending_head = calld->pending_next;
    339     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
    340     GRPC_CLOSURE_INIT(
    341         &calld->kill_zombie_closure, kill_zombie,
    342         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
    343         grpc_schedule_on_exec_ctx);
    344     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
    345   }
    346 }
    347 
    348 static void request_matcher_kill_requests(grpc_server* server,
    349                                           request_matcher* rm,
    350                                           grpc_error* error) {
    351   requested_call* rc;
    352   for (size_t i = 0; i < server->cq_count; i++) {
    353     while ((rc = reinterpret_cast<requested_call*>(
    354                 gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {
    355       fail_call(server, i, rc, GRPC_ERROR_REF(error));
    356     }
    357   }
    358   GRPC_ERROR_UNREF(error);
    359 }
    360 
    361 /*
    362  * server proper
    363  */
    364 
    365 static void server_ref(grpc_server* server) {
    366   gpr_ref(&server->internal_refcount);
    367 }
    368 
    369 static void server_delete(grpc_server* server) {
    370   registered_method* rm;
    371   size_t i;
    372   server->channelz_server.reset();
    373   grpc_channel_args_destroy(server->channel_args);
    374   gpr_mu_destroy(&server->mu_global);
    375   gpr_mu_destroy(&server->mu_call);
    376   gpr_cv_destroy(&server->starting_cv);
    377   while ((rm = server->registered_methods) != nullptr) {
    378     server->registered_methods = rm->next;
    379     if (server->started) {
    380       request_matcher_destroy(&rm->matcher);
    381     }
    382     gpr_free(rm->method);
    383     gpr_free(rm->host);
    384     gpr_free(rm);
    385   }
    386   if (server->started) {
    387     request_matcher_destroy(&server->unregistered_request_matcher);
    388   }
    389   for (i = 0; i < server->cq_count; i++) {
    390     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
    391   }
    392   gpr_free(server->cqs);
    393   gpr_free(server->pollsets);
    394   gpr_free(server->shutdown_tags);
    395   gpr_free(server);
    396 }
    397 
    398 static void server_unref(grpc_server* server) {
    399   if (gpr_unref(&server->internal_refcount)) {
    400     server_delete(server);
    401   }
    402 }
    403 
    404 static int is_channel_orphaned(channel_data* chand) {
    405   return chand->next == chand;
    406 }
    407 
    408 static void orphan_channel(channel_data* chand) {
    409   chand->next->prev = chand->prev;
    410   chand->prev->next = chand->next;
    411   chand->next = chand->prev = chand;
    412 }
    413 
    414 static void finish_destroy_channel(void* cd, grpc_error* error) {
    415   channel_data* chand = static_cast<channel_data*>(cd);
    416   grpc_server* server = chand->server;
    417   GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
    418   server_unref(server);
    419 }
    420 
    421 static void destroy_channel(channel_data* chand, grpc_error* error) {
    422   if (is_channel_orphaned(chand)) return;
    423   GPR_ASSERT(chand->server != nullptr);
    424   orphan_channel(chand);
    425   server_ref(chand->server);
    426   maybe_finish_shutdown(chand->server);
    427   GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
    428                     finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
    429 
    430   if (grpc_server_channel_trace.enabled() && error != GRPC_ERROR_NONE) {
    431     const char* msg = grpc_error_string(error);
    432     gpr_log(GPR_INFO, "Disconnected client: %s", msg);
    433   }
    434   GRPC_ERROR_UNREF(error);
    435 
    436   grpc_transport_op* op =
    437       grpc_make_transport_op(&chand->finish_destroy_channel_closure);
    438   op->set_accept_stream = true;
    439   grpc_channel_next_op(grpc_channel_stack_element(
    440                            grpc_channel_get_channel_stack(chand->channel), 0),
    441                        op);
    442 }
    443 
    444 static void done_request_event(void* req, grpc_cq_completion* c) {
    445   gpr_free(req);
    446 }
    447 
    448 static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
    449                          requested_call* rc) {
    450   grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
    451   grpc_call* call = calld->call;
    452   *rc->call = call;
    453   calld->cq_new = server->cqs[cq_idx];
    454   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
    455   switch (rc->type) {
    456     case BATCH_CALL:
    457       GPR_ASSERT(calld->host_set);
    458       GPR_ASSERT(calld->path_set);
    459       rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
    460       rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
    461       rc->data.batch.details->deadline =
    462           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
    463       rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
    464       break;
    465     case REGISTERED_CALL:
    466       *rc->data.registered.deadline =
    467           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
    468       if (rc->data.registered.optional_payload) {
    469         *rc->data.registered.optional_payload = calld->payload;
    470         calld->payload = nullptr;
    471       }
    472       break;
    473     default:
    474       GPR_UNREACHABLE_CODE(return );
    475   }
    476 
    477   grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
    478                  rc, &rc->completion);
    479 }
    480 
    481 static void publish_new_rpc(void* arg, grpc_error* error) {
    482   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
    483   call_data* calld = static_cast<call_data*>(call_elem->call_data);
    484   channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
    485   request_matcher* rm = calld->matcher;
    486   grpc_server* server = rm->server;
    487 
    488   if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
    489     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
    490     GRPC_CLOSURE_INIT(
    491         &calld->kill_zombie_closure, kill_zombie,
    492         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
    493         grpc_schedule_on_exec_ctx);
    494     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
    495     return;
    496   }
    497 
    498   for (size_t i = 0; i < server->cq_count; i++) {
    499     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
    500     requested_call* rc = reinterpret_cast<requested_call*>(
    501         gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));
    502     if (rc == nullptr) {
    503       continue;
    504     } else {
    505       GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
    506       gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
    507       publish_call(server, calld, cq_idx, rc);
    508       return; /* early out */
    509     }
    510   }
    511 
    512   /* no cq to take the request found: queue it on the slow list */
    513   GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
    514   gpr_mu_lock(&server->mu_call);
    515 
    516   // We need to ensure that all the queues are empty.  We do this under
    517   // the server mu_call lock to ensure that if something is added to
    518   // an empty request queue, it will block until the call is actually
    519   // added to the pending list.
    520   for (size_t i = 0; i < server->cq_count; i++) {
    521     size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
    522     requested_call* rc = reinterpret_cast<requested_call*>(
    523         gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
    524     if (rc == nullptr) {
    525       continue;
    526     } else {
    527       gpr_mu_unlock(&server->mu_call);
    528       GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
    529       gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
    530       publish_call(server, calld, cq_idx, rc);
    531       return; /* early out */
    532     }
    533   }
    534 
    535   gpr_atm_no_barrier_store(&calld->state, PENDING);
    536   if (rm->pending_head == nullptr) {
    537     rm->pending_tail = rm->pending_head = calld;
    538   } else {
    539     rm->pending_tail->pending_next = calld;
    540     rm->pending_tail = calld;
    541   }
    542   calld->pending_next = nullptr;
    543   gpr_mu_unlock(&server->mu_call);
    544 }
    545 
    546 static void finish_start_new_rpc(
    547     grpc_server* server, grpc_call_element* elem, request_matcher* rm,
    548     grpc_server_register_method_payload_handling payload_handling) {
    549   call_data* calld = static_cast<call_data*>(elem->call_data);
    550 
    551   if (gpr_atm_acq_load(&server->shutdown_flag)) {
    552     gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
    553     GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
    554                       grpc_schedule_on_exec_ctx);
    555     GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
    556     return;
    557   }
    558 
    559   calld->matcher = rm;
    560 
    561   switch (payload_handling) {
    562     case GRPC_SRM_PAYLOAD_NONE:
    563       publish_new_rpc(elem, GRPC_ERROR_NONE);
    564       break;
    565     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
    566       grpc_op op;
    567       memset(&op, 0, sizeof(op));
    568       op.op = GRPC_OP_RECV_MESSAGE;
    569       op.data.recv_message.recv_message = &calld->payload;
    570       GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
    571                         grpc_schedule_on_exec_ctx);
    572       grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
    573       break;
    574     }
    575   }
    576 }
    577 
    578 static void start_new_rpc(grpc_call_element* elem) {
    579   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    580   call_data* calld = static_cast<call_data*>(elem->call_data);
    581   grpc_server* server = chand->server;
    582   uint32_t i;
    583   uint32_t hash;
    584   channel_registered_method* rm;
    585 
    586   if (chand->registered_methods && calld->path_set && calld->host_set) {
    587     /* TODO(ctiller): unify these two searches */
    588     /* check for an exact match with host */
    589     hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(calld->host),
    590                               grpc_slice_hash(calld->path));
    591     for (i = 0; i <= chand->registered_method_max_probes; i++) {
    592       rm = &chand->registered_methods[(hash + i) %
    593                                       chand->registered_method_slots];
    594       if (!rm) break;
    595       if (!rm->has_host) continue;
    596       if (!grpc_slice_eq(rm->host, calld->host)) continue;
    597       if (!grpc_slice_eq(rm->method, calld->path)) continue;
    598       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
    599           0 == (calld->recv_initial_metadata_flags &
    600                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
    601         continue;
    602       }
    603       finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
    604                            rm->server_registered_method->payload_handling);
    605       return;
    606     }
    607     /* check for a wildcard method definition (no host set) */
    608     hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash(calld->path));
    609     for (i = 0; i <= chand->registered_method_max_probes; i++) {
    610       rm = &chand->registered_methods[(hash + i) %
    611                                       chand->registered_method_slots];
    612       if (!rm) break;
    613       if (rm->has_host) continue;
    614       if (!grpc_slice_eq(rm->method, calld->path)) continue;
    615       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
    616           0 == (calld->recv_initial_metadata_flags &
    617                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
    618         continue;
    619       }
    620       finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
    621                            rm->server_registered_method->payload_handling);
    622       return;
    623     }
    624   }
    625   finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
    626                        GRPC_SRM_PAYLOAD_NONE);
    627 }
    628 
    629 static int num_listeners(grpc_server* server) {
    630   listener* l;
    631   int n = 0;
    632   for (l = server->listeners; l; l = l->next) {
    633     n++;
    634   }
    635   return n;
    636 }
    637 
    638 static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
    639   server_unref(static_cast<grpc_server*>(server));
    640 }
    641 
    642 static int num_channels(grpc_server* server) {
    643   channel_data* chand;
    644   int n = 0;
    645   for (chand = server->root_channel_data.next;
    646        chand != &server->root_channel_data; chand = chand->next) {
    647     n++;
    648   }
    649   return n;
    650 }
    651 
    652 static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
    653   if (server->started) {
    654     request_matcher_kill_requests(server, &server->unregistered_request_matcher,
    655                                   GRPC_ERROR_REF(error));
    656     request_matcher_zombify_all_pending_calls(
    657         &server->unregistered_request_matcher);
    658     for (registered_method* rm = server->registered_methods; rm;
    659          rm = rm->next) {
    660       request_matcher_kill_requests(server, &rm->matcher,
    661                                     GRPC_ERROR_REF(error));
    662       request_matcher_zombify_all_pending_calls(&rm->matcher);
    663     }
    664   }
    665   GRPC_ERROR_UNREF(error);
    666 }
    667 
    668 static void maybe_finish_shutdown(grpc_server* server) {
    669   size_t i;
    670   if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
    671     return;
    672   }
    673 
    674   kill_pending_work_locked(
    675       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
    676 
    677   if (server->root_channel_data.next != &server->root_channel_data ||
    678       server->listeners_destroyed < num_listeners(server)) {
    679     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
    680                                   server->last_shutdown_message_time),
    681                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
    682       server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
    683       gpr_log(GPR_DEBUG,
    684               "Waiting for %d channels and %d/%d listeners to be destroyed"
    685               " before shutting down server",
    686               num_channels(server),
    687               num_listeners(server) - server->listeners_destroyed,
    688               num_listeners(server));
    689     }
    690     return;
    691   }
    692   server->shutdown_published = 1;
    693   for (i = 0; i < server->num_shutdown_tags; i++) {
    694     server_ref(server);
    695     grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
    696                    GRPC_ERROR_NONE, done_shutdown_event, server,
    697                    &server->shutdown_tags[i].completion);
    698   }
    699 }
    700 
    701 static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
    702   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
    703   call_data* calld = static_cast<call_data*>(elem->call_data);
    704   grpc_millis op_deadline;
    705 
    706   if (error == GRPC_ERROR_NONE) {
    707     GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
    708     GPR_ASSERT(calld->recv_initial_metadata->idx.named.authority != nullptr);
    709     calld->path = grpc_slice_ref_internal(
    710         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
    711     calld->host = grpc_slice_ref_internal(
    712         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
    713     calld->path_set = true;
    714     calld->host_set = true;
    715     grpc_metadata_batch_remove(calld->recv_initial_metadata,
    716                                calld->recv_initial_metadata->idx.named.path);
    717     grpc_metadata_batch_remove(
    718         calld->recv_initial_metadata,
    719         calld->recv_initial_metadata->idx.named.authority);
    720   } else {
    721     GRPC_ERROR_REF(error);
    722   }
    723   op_deadline = calld->recv_initial_metadata->deadline;
    724   if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
    725     calld->deadline = op_deadline;
    726   }
    727   if (calld->host_set && calld->path_set) {
    728     /* do nothing */
    729   } else {
    730     grpc_error* src_error = error;
    731     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
    732         "Missing :authority or :path", &error, 1);
    733     GRPC_ERROR_UNREF(src_error);
    734   }
    735 
    736   GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
    737 }
    738 
    739 static void server_recv_trailing_metadata_ready(void* user_data,
    740                                                 grpc_error* err) {
    741   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
    742   call_data* calld = static_cast<call_data*>(elem->call_data);
    743   err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
    744   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
    745 }
    746 
    747 static void server_mutate_op(grpc_call_element* elem,
    748                              grpc_transport_stream_op_batch* op) {
    749   call_data* calld = static_cast<call_data*>(elem->call_data);
    750 
    751   if (op->recv_initial_metadata) {
    752     GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
    753     calld->recv_initial_metadata =
    754         op->payload->recv_initial_metadata.recv_initial_metadata;
    755     calld->on_done_recv_initial_metadata =
    756         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
    757     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
    758         &calld->server_on_recv_initial_metadata;
    759     op->payload->recv_initial_metadata.recv_flags =
    760         &calld->recv_initial_metadata_flags;
    761   }
    762   if (op->recv_trailing_metadata) {
    763     calld->original_recv_trailing_metadata_ready =
    764         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
    765     op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
    766         &calld->recv_trailing_metadata_ready;
    767   }
    768 }
    769 
    770 static void server_start_transport_stream_op_batch(
    771     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
    772   server_mutate_op(elem, op);
    773   grpc_call_next_op(elem, op);
    774 }
    775 
    776 static void got_initial_metadata(void* ptr, grpc_error* error) {
    777   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
    778   call_data* calld = static_cast<call_data*>(elem->call_data);
    779   if (error == GRPC_ERROR_NONE) {
    780     start_new_rpc(elem);
    781   } else {
    782     if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
    783       GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
    784                         grpc_schedule_on_exec_ctx);
    785       GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
    786     } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
    787       /* zombied call will be destroyed when it's removed from the pending
    788          queue... later */
    789     }
    790   }
    791 }
    792 
    793 static void accept_stream(void* cd, grpc_transport* transport,
    794                           const void* transport_server_data) {
    795   channel_data* chand = static_cast<channel_data*>(cd);
    796   /* create a call */
    797   grpc_call_create_args args;
    798   memset(&args, 0, sizeof(args));
    799   args.channel = chand->channel;
    800   args.server_transport_data = transport_server_data;
    801   args.send_deadline = GRPC_MILLIS_INF_FUTURE;
    802   args.server = chand->server;
    803   grpc_call* call;
    804   grpc_error* error = grpc_call_create(&args, &call);
    805   grpc_call_element* elem =
    806       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
    807   if (error != GRPC_ERROR_NONE) {
    808     got_initial_metadata(elem, error);
    809     GRPC_ERROR_UNREF(error);
    810     return;
    811   }
    812   call_data* calld = static_cast<call_data*>(elem->call_data);
    813   grpc_op op;
    814   memset(&op, 0, sizeof(op));
    815   op.op = GRPC_OP_RECV_INITIAL_METADATA;
    816   op.data.recv_initial_metadata.recv_initial_metadata =
    817       &calld->initial_metadata;
    818   GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
    819                     grpc_schedule_on_exec_ctx);
    820   grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
    821 }
    822 
    823 static void channel_connectivity_changed(void* cd, grpc_error* error) {
    824   channel_data* chand = static_cast<channel_data*>(cd);
    825   grpc_server* server = chand->server;
    826   if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
    827     grpc_transport_op* op = grpc_make_transport_op(nullptr);
    828     op->on_connectivity_state_change = &chand->channel_connectivity_changed;
    829     op->connectivity_state = &chand->connectivity_state;
    830     grpc_channel_next_op(grpc_channel_stack_element(
    831                              grpc_channel_get_channel_stack(chand->channel), 0),
    832                          op);
    833   } else {
    834     gpr_mu_lock(&server->mu_global);
    835     destroy_channel(chand, GRPC_ERROR_REF(error));
    836     gpr_mu_unlock(&server->mu_global);
    837     GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
    838   }
    839 }
    840 
    841 static grpc_error* init_call_elem(grpc_call_element* elem,
    842                                   const grpc_call_element_args* args) {
    843   call_data* calld = static_cast<call_data*>(elem->call_data);
    844   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    845   memset(calld, 0, sizeof(call_data));
    846   calld->deadline = GRPC_MILLIS_INF_FUTURE;
    847   calld->call = grpc_call_from_top_element(elem);
    848 
    849   GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
    850                     server_on_recv_initial_metadata, elem,
    851                     grpc_schedule_on_exec_ctx);
    852   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
    853                     server_recv_trailing_metadata_ready, elem,
    854                     grpc_schedule_on_exec_ctx);
    855   server_ref(chand->server);
    856   return GRPC_ERROR_NONE;
    857 }
    858 
    859 static void destroy_call_elem(grpc_call_element* elem,
    860                               const grpc_call_final_info* final_info,
    861                               grpc_closure* ignored) {
    862   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    863   call_data* calld = static_cast<call_data*>(elem->call_data);
    864 
    865   GPR_ASSERT(calld->state != PENDING);
    866   GRPC_ERROR_UNREF(calld->error);
    867   if (calld->host_set) {
    868     grpc_slice_unref_internal(calld->host);
    869   }
    870   if (calld->path_set) {
    871     grpc_slice_unref_internal(calld->path);
    872   }
    873   grpc_metadata_array_destroy(&calld->initial_metadata);
    874   grpc_byte_buffer_destroy(calld->payload);
    875 
    876   server_unref(chand->server);
    877 }
    878 
    879 static grpc_error* init_channel_elem(grpc_channel_element* elem,
    880                                      grpc_channel_element_args* args) {
    881   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    882   GPR_ASSERT(args->is_first);
    883   GPR_ASSERT(!args->is_last);
    884   chand->server = nullptr;
    885   chand->channel = nullptr;
    886   chand->next = chand->prev = chand;
    887   chand->registered_methods = nullptr;
    888   chand->connectivity_state = GRPC_CHANNEL_IDLE;
    889   GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
    890                     channel_connectivity_changed, chand,
    891                     grpc_schedule_on_exec_ctx);
    892   return GRPC_ERROR_NONE;
    893 }
    894 
    895 static void destroy_channel_elem(grpc_channel_element* elem) {
    896   size_t i;
    897   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
    898   if (chand->registered_methods) {
    899     for (i = 0; i < chand->registered_method_slots; i++) {
    900       grpc_slice_unref_internal(chand->registered_methods[i].method);
    901       if (chand->registered_methods[i].has_host) {
    902         grpc_slice_unref_internal(chand->registered_methods[i].host);
    903       }
    904     }
    905     gpr_free(chand->registered_methods);
    906   }
    907   if (chand->server) {
    908     gpr_mu_lock(&chand->server->mu_global);
    909     chand->next->prev = chand->prev;
    910     chand->prev->next = chand->next;
    911     chand->next = chand->prev = chand;
    912     maybe_finish_shutdown(chand->server);
    913     gpr_mu_unlock(&chand->server->mu_global);
    914     server_unref(chand->server);
    915   }
    916 }
    917 
    918 const grpc_channel_filter grpc_server_top_filter = {
    919     server_start_transport_stream_op_batch,
    920     grpc_channel_next_op,
    921     sizeof(call_data),
    922     init_call_elem,
    923     grpc_call_stack_ignore_set_pollset_or_pollset_set,
    924     destroy_call_elem,
    925     sizeof(channel_data),
    926     init_channel_elem,
    927     destroy_channel_elem,
    928     grpc_channel_next_get_info,
    929     "server",
    930 };
    931 
    932 static void register_completion_queue(grpc_server* server,
    933                                       grpc_completion_queue* cq,
    934                                       void* reserved) {
    935   size_t i, n;
    936   GPR_ASSERT(!reserved);
    937   for (i = 0; i < server->cq_count; i++) {
    938     if (server->cqs[i] == cq) return;
    939   }
    940 
    941   GRPC_CQ_INTERNAL_REF(cq, "server");
    942   n = server->cq_count++;
    943   server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
    944       server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
    945   server->cqs[n] = cq;
    946 }
    947 
    948 void grpc_server_register_completion_queue(grpc_server* server,
    949                                            grpc_completion_queue* cq,
    950                                            void* reserved) {
    951   GRPC_API_TRACE(
    952       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
    953       (server, cq, reserved));
    954 
    955   if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
    956     gpr_log(GPR_INFO,
    957             "Completion queue which is not of type GRPC_CQ_NEXT is being "
    958             "registered as a server-completion-queue");
    959     /* Ideally we should log an error and abort but ruby-wrapped-language API
    960        calls grpc_completion_queue_pluck() on server completion queues */
    961   }
    962 
    963   register_completion_queue(server, cq, reserved);
    964 }
    965 
    966 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
    967   grpc_core::ExecCtx exec_ctx;
    968   GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
    969 
    970   grpc_server* server =
    971       static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
    972 
    973   gpr_mu_init(&server->mu_global);
    974   gpr_mu_init(&server->mu_call);
    975   gpr_cv_init(&server->starting_cv);
    976 
    977   /* decremented by grpc_server_destroy */
    978   gpr_ref_init(&server->internal_refcount, 1);
    979   server->root_channel_data.next = server->root_channel_data.prev =
    980       &server->root_channel_data;
    981 
    982   server->channel_args = grpc_channel_args_copy(args);
    983 
    984   const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
    985   if (grpc_channel_arg_get_bool(arg, false)) {
    986     arg = grpc_channel_args_find(args,
    987                                  GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
    988     size_t trace_events_per_node =
    989         grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
    990     server->channelz_server =
    991         grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
    992             trace_events_per_node);
    993     server->channelz_server->AddTraceEvent(
    994         grpc_core::channelz::ChannelTrace::Severity::Info,
    995         grpc_slice_from_static_string("Server created"));
    996   }
    997 
    998   return server;
    999 }
   1000 
   1001 static int streq(const char* a, const char* b) {
   1002   if (a == nullptr && b == nullptr) return 1;
   1003   if (a == nullptr) return 0;
   1004   if (b == nullptr) return 0;
   1005   return 0 == strcmp(a, b);
   1006 }
   1007 
   1008 void* grpc_server_register_method(
   1009     grpc_server* server, const char* method, const char* host,
   1010     grpc_server_register_method_payload_handling payload_handling,
   1011     uint32_t flags) {
   1012   registered_method* m;
   1013   GRPC_API_TRACE(
   1014       "grpc_server_register_method(server=%p, method=%s, host=%s, "
   1015       "flags=0x%08x)",
   1016       4, (server, method, host, flags));
   1017   if (!method) {
   1018     gpr_log(GPR_ERROR,
   1019             "grpc_server_register_method method string cannot be NULL");
   1020     return nullptr;
   1021   }
   1022   for (m = server->registered_methods; m; m = m->next) {
   1023     if (streq(m->method, method) && streq(m->host, host)) {
   1024       gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
   1025               host ? host : "*");
   1026       return nullptr;
   1027     }
   1028   }
   1029   if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
   1030     gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
   1031             flags);
   1032     return nullptr;
   1033   }
   1034   m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));
   1035   m->method = gpr_strdup(method);
   1036   m->host = gpr_strdup(host);
   1037   m->next = server->registered_methods;
   1038   m->payload_handling = payload_handling;
   1039   m->flags = flags;
   1040   server->registered_methods = m;
   1041   return m;
   1042 }
   1043 
   1044 static void start_listeners(void* s, grpc_error* error) {
   1045   grpc_server* server = static_cast<grpc_server*>(s);
   1046   for (listener* l = server->listeners; l; l = l->next) {
   1047     l->start(server, l->arg, server->pollsets, server->pollset_count);
   1048   }
   1049 
   1050   gpr_mu_lock(&server->mu_global);
   1051   server->starting = false;
   1052   gpr_cv_signal(&server->starting_cv);
   1053   gpr_mu_unlock(&server->mu_global);
   1054 
   1055   server_unref(server);
   1056 }
   1057 
   1058 void grpc_server_start(grpc_server* server) {
   1059   size_t i;
   1060   grpc_core::ExecCtx exec_ctx;
   1061 
   1062   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
   1063 
   1064   server->started = true;
   1065   server->pollset_count = 0;
   1066   server->pollsets = static_cast<grpc_pollset**>(
   1067       gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
   1068   for (i = 0; i < server->cq_count; i++) {
   1069     if (grpc_cq_can_listen(server->cqs[i])) {
   1070       server->pollsets[server->pollset_count++] =
   1071           grpc_cq_pollset(server->cqs[i]);
   1072     }
   1073   }
   1074   request_matcher_init(&server->unregistered_request_matcher, server);
   1075   for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
   1076     request_matcher_init(&rm->matcher, server);
   1077   }
   1078 
   1079   server_ref(server);
   1080   server->starting = true;
   1081   GRPC_CLOSURE_SCHED(
   1082       GRPC_CLOSURE_CREATE(start_listeners, server,
   1083                           grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
   1084       GRPC_ERROR_NONE);
   1085 }
   1086 
   1087 void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
   1088                               size_t* pollset_count) {
   1089   *pollset_count = server->pollset_count;
   1090   *pollsets = server->pollsets;
   1091 }
   1092 
   1093 void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
   1094                                  grpc_pollset* accepting_pollset,
   1095                                  const grpc_channel_args* args) {
   1096   size_t num_registered_methods;
   1097   size_t alloc;
   1098   registered_method* rm;
   1099   channel_registered_method* crm;
   1100   grpc_channel* channel;
   1101   channel_data* chand;
   1102   uint32_t hash;
   1103   size_t slots;
   1104   uint32_t probes;
   1105   uint32_t max_probes = 0;
   1106   grpc_transport_op* op = nullptr;
   1107 
   1108   channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
   1109   chand = static_cast<channel_data*>(
   1110       grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
   1111           ->channel_data);
   1112   chand->server = s;
   1113   server_ref(s);
   1114   chand->channel = channel;
   1115 
   1116   size_t cq_idx;
   1117   for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
   1118     if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
   1119   }
   1120   if (cq_idx == s->cq_count) {
   1121     /* completion queue not found: pick a random one to publish new calls to */
   1122     cq_idx = static_cast<size_t>(rand()) % s->cq_count;
   1123   }
   1124   chand->cq_idx = cq_idx;
   1125 
   1126   num_registered_methods = 0;
   1127   for (rm = s->registered_methods; rm; rm = rm->next) {
   1128     num_registered_methods++;
   1129   }
   1130   /* build a lookup table phrased in terms of mdstr's in this channels context
   1131      to quickly find registered methods */
   1132   if (num_registered_methods > 0) {
   1133     slots = 2 * num_registered_methods;
   1134     alloc = sizeof(channel_registered_method) * slots;
   1135     chand->registered_methods =
   1136         static_cast<channel_registered_method*>(gpr_zalloc(alloc));
   1137     for (rm = s->registered_methods; rm; rm = rm->next) {
   1138       grpc_slice host;
   1139       bool has_host;
   1140       grpc_slice method;
   1141       if (rm->host != nullptr) {
   1142         host = grpc_slice_intern(grpc_slice_from_static_string(rm->host));
   1143         has_host = true;
   1144       } else {
   1145         has_host = false;
   1146       }
   1147       method = grpc_slice_intern(grpc_slice_from_static_string(rm->method));
   1148       hash = GRPC_MDSTR_KV_HASH(has_host ? grpc_slice_hash(host) : 0,
   1149                                 grpc_slice_hash(method));
   1150       for (probes = 0; chand->registered_methods[(hash + probes) % slots]
   1151                            .server_registered_method != nullptr;
   1152            probes++)
   1153         ;
   1154       if (probes > max_probes) max_probes = probes;
   1155       crm = &chand->registered_methods[(hash + probes) % slots];
   1156       crm->server_registered_method = rm;
   1157       crm->flags = rm->flags;
   1158       crm->has_host = has_host;
   1159       if (has_host) {
   1160         crm->host = host;
   1161       }
   1162       crm->method = method;
   1163     }
   1164     GPR_ASSERT(slots <= UINT32_MAX);
   1165     chand->registered_method_slots = static_cast<uint32_t>(slots);
   1166     chand->registered_method_max_probes = max_probes;
   1167   }
   1168 
   1169   gpr_mu_lock(&s->mu_global);
   1170   chand->next = &s->root_channel_data;
   1171   chand->prev = chand->next->prev;
   1172   chand->next->prev = chand->prev->next = chand;
   1173   gpr_mu_unlock(&s->mu_global);
   1174 
   1175   GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
   1176   op = grpc_make_transport_op(nullptr);
   1177   op->set_accept_stream = true;
   1178   op->set_accept_stream_fn = accept_stream;
   1179   op->set_accept_stream_user_data = chand;
   1180   op->on_connectivity_state_change = &chand->channel_connectivity_changed;
   1181   op->connectivity_state = &chand->connectivity_state;
   1182   if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
   1183     op->disconnect_with_error =
   1184         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
   1185   }
   1186   grpc_transport_perform_op(transport, op);
   1187 }
   1188 
   1189 void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
   1190   (void)done_arg;
   1191   gpr_free(storage);
   1192 }
   1193 
   1194 static void listener_destroy_done(void* s, grpc_error* error) {
   1195   grpc_server* server = static_cast<grpc_server*>(s);
   1196   gpr_mu_lock(&server->mu_global);
   1197   server->listeners_destroyed++;
   1198   maybe_finish_shutdown(server);
   1199   gpr_mu_unlock(&server->mu_global);
   1200 }
   1201 
   1202 /*
   1203   - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
   1204     grpc_server_request_call and grpc_server_request_registered call will now be
   1205     cancelled). See 'kill_pending_work_locked()'
   1206 
   1207   - Shuts down the listeners (i.e the server will no longer listen on the port
   1208     for new incoming channels).
   1209 
   1210   - Iterates through all channels on the server and sends shutdown msg (see
   1211     'channel_broadcaster_shutdown()' for details) to the clients via the
   1212     transport layer. The transport layer then guarantees the following:
   1213      -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
   1214      -- If the server has outstanding calls that are in the process, the
   1215         connection is NOT closed until the server is done with all those calls
   1216      -- Once, there are no more calls in progress, the channel is closed
   1217  */
   1218 void grpc_server_shutdown_and_notify(grpc_server* server,
   1219                                      grpc_completion_queue* cq, void* tag) {
   1220   listener* l;
   1221   shutdown_tag* sdt;
   1222   channel_broadcaster broadcaster;
   1223   grpc_core::ExecCtx exec_ctx;
   1224 
   1225   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
   1226                  (server, cq, tag));
   1227 
   1228   /* wait for startup to be finished: locks mu_global */
   1229   gpr_mu_lock(&server->mu_global);
   1230   while (server->starting) {
   1231     gpr_cv_wait(&server->starting_cv, &server->mu_global,
   1232                 gpr_inf_future(GPR_CLOCK_MONOTONIC));
   1233   }
   1234 
   1235   /* stay locked, and gather up some stuff to do */
   1236   GPR_ASSERT(grpc_cq_begin_op(cq, tag));
   1237   if (server->shutdown_published) {
   1238     grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
   1239                    static_cast<grpc_cq_completion*>(
   1240                        gpr_malloc(sizeof(grpc_cq_completion))));
   1241     gpr_mu_unlock(&server->mu_global);
   1242     return;
   1243   }
   1244   server->shutdown_tags = static_cast<shutdown_tag*>(
   1245       gpr_realloc(server->shutdown_tags,
   1246                   sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
   1247   sdt = &server->shutdown_tags[server->num_shutdown_tags++];
   1248   sdt->tag = tag;
   1249   sdt->cq = cq;
   1250   if (gpr_atm_acq_load(&server->shutdown_flag)) {
   1251     gpr_mu_unlock(&server->mu_global);
   1252     return;
   1253   }
   1254 
   1255   server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
   1256 
   1257   channel_broadcaster_init(server, &broadcaster);
   1258 
   1259   gpr_atm_rel_store(&server->shutdown_flag, 1);
   1260 
   1261   /* collect all unregistered then registered calls */
   1262   gpr_mu_lock(&server->mu_call);
   1263   kill_pending_work_locked(
   1264       server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
   1265   gpr_mu_unlock(&server->mu_call);
   1266 
   1267   maybe_finish_shutdown(server);
   1268   gpr_mu_unlock(&server->mu_global);
   1269 
   1270   /* Shutdown listeners */
   1271   for (l = server->listeners; l; l = l->next) {
   1272     GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
   1273                       grpc_schedule_on_exec_ctx);
   1274     l->destroy(server, l->arg, &l->destroy_done);
   1275   }
   1276 
   1277   channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
   1278                                GRPC_ERROR_NONE);
   1279 }
   1280 
   1281 void grpc_server_cancel_all_calls(grpc_server* server) {
   1282   channel_broadcaster broadcaster;
   1283   grpc_core::ExecCtx exec_ctx;
   1284 
   1285   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
   1286 
   1287   gpr_mu_lock(&server->mu_global);
   1288   channel_broadcaster_init(server, &broadcaster);
   1289   gpr_mu_unlock(&server->mu_global);
   1290 
   1291   channel_broadcaster_shutdown(
   1292       &broadcaster, false /* send_goaway */,
   1293       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
   1294 }
   1295 
   1296 void grpc_server_destroy(grpc_server* server) {
   1297   listener* l;
   1298   grpc_core::ExecCtx exec_ctx;
   1299 
   1300   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
   1301 
   1302   gpr_mu_lock(&server->mu_global);
   1303   GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
   1304   GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
   1305 
   1306   while (server->listeners) {
   1307     l = server->listeners;
   1308     server->listeners = l->next;
   1309     gpr_free(l);
   1310   }
   1311 
   1312   gpr_mu_unlock(&server->mu_global);
   1313 
   1314   server_unref(server);
   1315 }
   1316 
   1317 void grpc_server_add_listener(grpc_server* server, void* arg,
   1318                               void (*start)(grpc_server* server, void* arg,
   1319                                             grpc_pollset** pollsets,
   1320                                             size_t pollset_count),
   1321                               void (*destroy)(grpc_server* server, void* arg,
   1322                                               grpc_closure* on_done)) {
   1323   listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
   1324   l->arg = arg;
   1325   l->start = start;
   1326   l->destroy = destroy;
   1327   l->next = server->listeners;
   1328   server->listeners = l;
   1329 }
   1330 
   1331 static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
   1332                                           requested_call* rc) {
   1333   call_data* calld = nullptr;
   1334   request_matcher* rm = nullptr;
   1335   if (gpr_atm_acq_load(&server->shutdown_flag)) {
   1336     fail_call(server, cq_idx, rc,
   1337               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
   1338     return GRPC_CALL_OK;
   1339   }
   1340   switch (rc->type) {
   1341     case BATCH_CALL:
   1342       rm = &server->unregistered_request_matcher;
   1343       break;
   1344     case REGISTERED_CALL:
   1345       rm = &rc->data.registered.method->matcher;
   1346       break;
   1347   }
   1348   if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
   1349     /* this was the first queued request: we need to lock and start
   1350        matching calls */
   1351     gpr_mu_lock(&server->mu_call);
   1352     while ((calld = rm->pending_head) != nullptr) {
   1353       rc = reinterpret_cast<requested_call*>(
   1354           gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
   1355       if (rc == nullptr) break;
   1356       rm->pending_head = calld->pending_next;
   1357       gpr_mu_unlock(&server->mu_call);
   1358       if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
   1359         // Zombied Call
   1360         GRPC_CLOSURE_INIT(
   1361             &calld->kill_zombie_closure, kill_zombie,
   1362             grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
   1363             grpc_schedule_on_exec_ctx);
   1364         GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
   1365       } else {
   1366         publish_call(server, calld, cq_idx, rc);
   1367       }
   1368       gpr_mu_lock(&server->mu_call);
   1369     }
   1370     gpr_mu_unlock(&server->mu_call);
   1371   }
   1372   return GRPC_CALL_OK;
   1373 }
   1374 
   1375 grpc_call_error grpc_server_request_call(
   1376     grpc_server* server, grpc_call** call, grpc_call_details* details,
   1377     grpc_metadata_array* initial_metadata,
   1378     grpc_completion_queue* cq_bound_to_call,
   1379     grpc_completion_queue* cq_for_notification, void* tag) {
   1380   grpc_call_error error;
   1381   grpc_core::ExecCtx exec_ctx;
   1382   requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
   1383   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
   1384   GRPC_API_TRACE(
   1385       "grpc_server_request_call("
   1386       "server=%p, call=%p, details=%p, initial_metadata=%p, "
   1387       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
   1388       7,
   1389       (server, call, details, initial_metadata, cq_bound_to_call,
   1390        cq_for_notification, tag));
   1391   size_t cq_idx;
   1392   for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
   1393     if (server->cqs[cq_idx] == cq_for_notification) {
   1394       break;
   1395     }
   1396   }
   1397   if (cq_idx == server->cq_count) {
   1398     gpr_free(rc);
   1399     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
   1400     goto done;
   1401   }
   1402   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
   1403     gpr_free(rc);
   1404     error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
   1405     goto done;
   1406   }
   1407   details->reserved = nullptr;
   1408   rc->cq_idx = cq_idx;
   1409   rc->type = BATCH_CALL;
   1410   rc->server = server;
   1411   rc->tag = tag;
   1412   rc->cq_bound_to_call = cq_bound_to_call;
   1413   rc->call = call;
   1414   rc->data.batch.details = details;
   1415   rc->initial_metadata = initial_metadata;
   1416   error = queue_call_request(server, cq_idx, rc);
   1417 done:
   1418 
   1419   return error;
   1420 }
   1421 
   1422 grpc_call_error grpc_server_request_registered_call(
   1423     grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
   1424     grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
   1425     grpc_completion_queue* cq_bound_to_call,
   1426     grpc_completion_queue* cq_for_notification, void* tag) {
   1427   grpc_call_error error;
   1428   grpc_core::ExecCtx exec_ctx;
   1429   requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
   1430   registered_method* rm = static_cast<registered_method*>(rmp);
   1431   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
   1432   GRPC_API_TRACE(
   1433       "grpc_server_request_registered_call("
   1434       "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
   1435       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
   1436       "tag=%p)",
   1437       9,
   1438       (server, rmp, call, deadline, initial_metadata, optional_payload,
   1439        cq_bound_to_call, cq_for_notification, tag));
   1440 
   1441   size_t cq_idx;
   1442   for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
   1443     if (server->cqs[cq_idx] == cq_for_notification) {
   1444       break;
   1445     }
   1446   }
   1447   if (cq_idx == server->cq_count) {
   1448     gpr_free(rc);
   1449     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
   1450     goto done;
   1451   }
   1452   if ((optional_payload == nullptr) !=
   1453       (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
   1454     gpr_free(rc);
   1455     error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
   1456     goto done;
   1457   }
   1458   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
   1459     gpr_free(rc);
   1460     error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
   1461     goto done;
   1462   }
   1463   rc->cq_idx = cq_idx;
   1464   rc->type = REGISTERED_CALL;
   1465   rc->server = server;
   1466   rc->tag = tag;
   1467   rc->cq_bound_to_call = cq_bound_to_call;
   1468   rc->call = call;
   1469   rc->data.registered.method = rm;
   1470   rc->data.registered.deadline = deadline;
   1471   rc->initial_metadata = initial_metadata;
   1472   rc->data.registered.optional_payload = optional_payload;
   1473   error = queue_call_request(server, cq_idx, rc);
   1474 done:
   1475 
   1476   return error;
   1477 }
   1478 
   1479 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
   1480                       grpc_error* error) {
   1481   *rc->call = nullptr;
   1482   rc->initial_metadata->count = 0;
   1483   GPR_ASSERT(error != GRPC_ERROR_NONE);
   1484 
   1485   grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
   1486                  &rc->completion);
   1487 }
   1488 
   1489 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
   1490   return server->channel_args;
   1491 }
   1492 
   1493 int grpc_server_has_open_connections(grpc_server* server) {
   1494   int r;
   1495   gpr_mu_lock(&server->mu_global);
   1496   r = server->root_channel_data.next != &server->root_channel_data;
   1497   gpr_mu_unlock(&server->mu_global);
   1498   return r;
   1499 }
   1500 
   1501 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
   1502     grpc_server* server) {
   1503   if (server == nullptr) {
   1504     return nullptr;
   1505   }
   1506   return server->channelz_server.get();
   1507 }
   1508