Home | History | Annotate | Download | only in surface
      1 /*
      2  *
      3  * Copyright 2015-2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 #include <grpc/support/port_platform.h>
     19 
     20 #include "src/core/lib/surface/completion_queue.h"
     21 
     22 #include <inttypes.h>
     23 #include <stdio.h>
     24 #include <string.h>
     25 
     26 #include <grpc/support/alloc.h>
     27 #include <grpc/support/atm.h>
     28 #include <grpc/support/log.h>
     29 #include <grpc/support/string_util.h>
     30 #include <grpc/support/time.h>
     31 
     32 #include "src/core/lib/debug/stats.h"
     33 #include "src/core/lib/gpr/spinlock.h"
     34 #include "src/core/lib/gpr/string.h"
     35 #include "src/core/lib/gpr/tls.h"
     36 #include "src/core/lib/iomgr/pollset.h"
     37 #include "src/core/lib/iomgr/timer.h"
     38 #include "src/core/lib/profiling/timers.h"
     39 #include "src/core/lib/surface/api_trace.h"
     40 #include "src/core/lib/surface/call.h"
     41 #include "src/core/lib/surface/event_string.h"
     42 
     43 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
     44 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
     45 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
     46 
     47 // Specifies a cq thread local cache.
     48 // The first event that occurs on a thread
     49 // with a cq cache will go into that cache, and
     50 // will only be returned on the thread that initialized the cache.
     51 // NOTE: Only one event will ever be cached.
     52 GPR_TLS_DECL(g_cached_event);
     53 GPR_TLS_DECL(g_cached_cq);
     54 
     55 typedef struct {
     56   grpc_pollset_worker** worker;
     57   void* tag;
     58 } plucker;
     59 
     60 typedef struct {
     61   bool can_get_pollset;
     62   bool can_listen;
     63   size_t (*size)(void);
     64   void (*init)(grpc_pollset* pollset, gpr_mu** mu);
     65   grpc_error* (*kick)(grpc_pollset* pollset,
     66                       grpc_pollset_worker* specific_worker);
     67   grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
     68                       grpc_millis deadline);
     69   void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
     70   void (*destroy)(grpc_pollset* pollset);
     71 } cq_poller_vtable;
     72 
     73 typedef struct non_polling_worker {
     74   gpr_cv cv;
     75   bool kicked;
     76   struct non_polling_worker* next;
     77   struct non_polling_worker* prev;
     78 } non_polling_worker;
     79 
     80 typedef struct {
     81   gpr_mu mu;
     82   non_polling_worker* root;
     83   grpc_closure* shutdown;
     84 } non_polling_poller;
     85 
     86 static size_t non_polling_poller_size(void) {
     87   return sizeof(non_polling_poller);
     88 }
     89 
     90 static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
     91   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
     92   gpr_mu_init(&npp->mu);
     93   *mu = &npp->mu;
     94 }
     95 
     96 static void non_polling_poller_destroy(grpc_pollset* pollset) {
     97   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
     98   gpr_mu_destroy(&npp->mu);
     99 }
    100 
    101 static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
    102                                            grpc_pollset_worker** worker,
    103                                            grpc_millis deadline) {
    104   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
    105   if (npp->shutdown) return GRPC_ERROR_NONE;
    106   non_polling_worker w;
    107   gpr_cv_init(&w.cv);
    108   if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
    109   if (npp->root == nullptr) {
    110     npp->root = w.next = w.prev = &w;
    111   } else {
    112     w.next = npp->root;
    113     w.prev = w.next->prev;
    114     w.next->prev = w.prev->next = &w;
    115   }
    116   w.kicked = false;
    117   gpr_timespec deadline_ts =
    118       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
    119   while (!npp->shutdown && !w.kicked &&
    120          !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
    121     ;
    122   grpc_core::ExecCtx::Get()->InvalidateNow();
    123   if (&w == npp->root) {
    124     npp->root = w.next;
    125     if (&w == npp->root) {
    126       if (npp->shutdown) {
    127         GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
    128       }
    129       npp->root = nullptr;
    130     }
    131   }
    132   w.next->prev = w.prev;
    133   w.prev->next = w.next;
    134   gpr_cv_destroy(&w.cv);
    135   if (worker != nullptr) *worker = nullptr;
    136   return GRPC_ERROR_NONE;
    137 }
    138 
    139 static grpc_error* non_polling_poller_kick(
    140     grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
    141   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
    142   if (specific_worker == nullptr)
    143     specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
    144   if (specific_worker != nullptr) {
    145     non_polling_worker* w =
    146         reinterpret_cast<non_polling_worker*>(specific_worker);
    147     if (!w->kicked) {
    148       w->kicked = true;
    149       gpr_cv_signal(&w->cv);
    150     }
    151   }
    152   return GRPC_ERROR_NONE;
    153 }
    154 
    155 static void non_polling_poller_shutdown(grpc_pollset* pollset,
    156                                         grpc_closure* closure) {
    157   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
    158   GPR_ASSERT(closure != nullptr);
    159   p->shutdown = closure;
    160   if (p->root == nullptr) {
    161     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
    162   } else {
    163     non_polling_worker* w = p->root;
    164     do {
    165       gpr_cv_signal(&w->cv);
    166       w = w->next;
    167     } while (w != p->root);
    168   }
    169 }
    170 
    171 static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
    172     /* GRPC_CQ_DEFAULT_POLLING */
    173     {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
    174      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
    175     /* GRPC_CQ_NON_LISTENING */
    176     {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
    177      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
    178     /* GRPC_CQ_NON_POLLING */
    179     {false, false, non_polling_poller_size, non_polling_poller_init,
    180      non_polling_poller_kick, non_polling_poller_work,
    181      non_polling_poller_shutdown, non_polling_poller_destroy},
    182 };
    183 
    184 typedef struct cq_vtable {
    185   grpc_cq_completion_type cq_completion_type;
    186   size_t data_size;
    187   void (*init)(void* data,
    188                grpc_experimental_completion_queue_functor* shutdown_callback);
    189   void (*shutdown)(grpc_completion_queue* cq);
    190   void (*destroy)(void* data);
    191   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
    192   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
    193                  void (*done)(void* done_arg, grpc_cq_completion* storage),
    194                  void* done_arg, grpc_cq_completion* storage);
    195   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
    196                      void* reserved);
    197   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
    198                       gpr_timespec deadline, void* reserved);
    199 } cq_vtable;
    200 
    201 /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
    202  * (a lockfree multiproducer single consumer queue). It uses a queue_lock
    203  * to support multiple consumers.
    204  * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
    205 typedef struct grpc_cq_event_queue {
    206   /* Spinlock to serialize consumers i.e pop() operations */
    207   gpr_spinlock queue_lock;
    208 
    209   gpr_mpscq queue;
    210 
    211   /* A lazy counter of number of items in the queue. This is NOT atomically
    212      incremented/decremented along with push/pop operations and hence is only
    213      eventually consistent */
    214   gpr_atm num_queue_items;
    215 } grpc_cq_event_queue;
    216 
    217 typedef struct cq_next_data {
    218   /** Completed events for completion-queues of type GRPC_CQ_NEXT */
    219   grpc_cq_event_queue queue;
    220 
    221   /** Counter of how many things have ever been queued on this completion queue
    222       useful for avoiding locks to check the queue */
    223   gpr_atm things_queued_ever;
    224 
    225   /* Number of outstanding events (+1 if not shut down) */
    226   gpr_atm pending_events;
    227 
    228   /** 0 initially. 1 once we initiated shutdown */
    229   bool shutdown_called;
    230 } cq_next_data;
    231 
    232 typedef struct cq_pluck_data {
    233   /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
    234   grpc_cq_completion completed_head;
    235   grpc_cq_completion* completed_tail;
    236 
    237   /** Number of pending events (+1 if we're not shutdown) */
    238   gpr_atm pending_events;
    239 
    240   /** Counter of how many things have ever been queued on this completion queue
    241       useful for avoiding locks to check the queue */
    242   gpr_atm things_queued_ever;
    243 
    244   /** 0 initially. 1 once we completed shutting */
    245   /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
    246    * (pending_events == 0). So consider removing this in future and use
    247    * pending_events */
    248   gpr_atm shutdown;
    249 
    250   /** 0 initially. 1 once we initiated shutdown */
    251   bool shutdown_called;
    252 
    253   int num_pluckers;
    254   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
    255 } cq_pluck_data;
    256 
    257 typedef struct cq_callback_data {
    258   /** No actual completed events queue, unlike other types */
    259 
    260   /** Number of pending events (+1 if we're not shutdown) */
    261   gpr_atm pending_events;
    262 
    263   /** Counter of how many things have ever been queued on this completion queue
    264       useful for avoiding locks to check the queue */
    265   gpr_atm things_queued_ever;
    266 
    267   /** 0 initially. 1 once we initiated shutdown */
    268   bool shutdown_called;
    269 
    270   /** A callback that gets invoked when the CQ completes shutdown */
    271   grpc_experimental_completion_queue_functor* shutdown_callback;
    272 } cq_callback_data;
    273 
    274 /* Completion queue structure */
    275 struct grpc_completion_queue {
    276   /** Once owning_refs drops to zero, we will destroy the cq */
    277   gpr_refcount owning_refs;
    278 
    279   gpr_mu* mu;
    280 
    281   const cq_vtable* vtable;
    282   const cq_poller_vtable* poller_vtable;
    283 
    284 #ifndef NDEBUG
    285   void** outstanding_tags;
    286   size_t outstanding_tag_count;
    287   size_t outstanding_tag_capacity;
    288 #endif
    289 
    290   grpc_closure pollset_shutdown_done;
    291   int num_polls;
    292 };
    293 
    294 /* Forward declarations */
    295 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
    296 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
    297 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
    298 static void cq_shutdown_next(grpc_completion_queue* cq);
    299 static void cq_shutdown_pluck(grpc_completion_queue* cq);
    300 static void cq_shutdown_callback(grpc_completion_queue* cq);
    301 
    302 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
    303 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
    304 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
    305 
    306 // A cq_end_op function is called when an operation on a given CQ with
    307 // a given tag has completed. The storage argument is a reference to the
    308 // space reserved for this completion as it is placed into the corresponding
    309 // queue. The done argument is a callback that will be invoked when it is
    310 // safe to free up that storage. The storage MUST NOT be freed until the
    311 // done callback is invoked.
    312 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
    313                                grpc_error* error,
    314                                void (*done)(void* done_arg,
    315                                             grpc_cq_completion* storage),
    316                                void* done_arg, grpc_cq_completion* storage);
    317 
    318 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
    319                                 grpc_error* error,
    320                                 void (*done)(void* done_arg,
    321                                              grpc_cq_completion* storage),
    322                                 void* done_arg, grpc_cq_completion* storage);
    323 
    324 static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag,
    325                                    grpc_error* error,
    326                                    void (*done)(void* done_arg,
    327                                                 grpc_cq_completion* storage),
    328                                    void* done_arg, grpc_cq_completion* storage);
    329 
    330 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
    331                           void* reserved);
    332 
    333 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
    334                            gpr_timespec deadline, void* reserved);
    335 
    336 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
    337 static void cq_init_next(
    338     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
    339 static void cq_init_pluck(
    340     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
    341 static void cq_init_callback(
    342     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
    343 static void cq_destroy_next(void* data);
    344 static void cq_destroy_pluck(void* data);
    345 static void cq_destroy_callback(void* data);
    346 
    347 /* Completion queue vtables based on the completion-type */
    348 static const cq_vtable g_cq_vtable[] = {
    349     /* GRPC_CQ_NEXT */
    350     {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
    351      cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
    352      nullptr},
    353     /* GRPC_CQ_PLUCK */
    354     {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
    355      cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
    356      cq_pluck},
    357     /* GRPC_CQ_CALLBACK */
    358     {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
    359      cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
    360      cq_end_op_for_callback, nullptr, nullptr},
    361 };
    362 
    363 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
    364 #define POLLSET_FROM_CQ(cq) \
    365   ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
    366 
    367 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
    368 
    369 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)                       \
    370   if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() ||        \
    371                                    (event)->type != GRPC_QUEUE_TIMEOUT)) { \
    372     char* _ev = grpc_event_string(event);                                  \
    373     gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev);                    \
    374     gpr_free(_ev);                                                         \
    375   }
    376 
    377 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
    378 
    379 void grpc_cq_global_init() {
    380   gpr_tls_init(&g_cached_event);
    381   gpr_tls_init(&g_cached_cq);
    382 }
    383 
    384 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
    385   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
    386     gpr_tls_set(&g_cached_event, (intptr_t)0);
    387     gpr_tls_set(&g_cached_cq, (intptr_t)cq);
    388   }
    389 }
    390 
    391 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
    392                                                    void** tag, int* ok) {
    393   grpc_cq_completion* storage =
    394       (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
    395   int ret = 0;
    396   if (storage != nullptr &&
    397       (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
    398     *tag = storage->tag;
    399     grpc_core::ExecCtx exec_ctx;
    400     *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
    401     storage->done(storage->done_arg, storage);
    402     ret = 1;
    403     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
    404     if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
    405       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
    406       gpr_mu_lock(cq->mu);
    407       cq_finish_shutdown_next(cq);
    408       gpr_mu_unlock(cq->mu);
    409       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
    410     }
    411   }
    412   gpr_tls_set(&g_cached_event, (intptr_t)0);
    413   gpr_tls_set(&g_cached_cq, (intptr_t)0);
    414 
    415   return ret;
    416 }
    417 
    418 static void cq_event_queue_init(grpc_cq_event_queue* q) {
    419   gpr_mpscq_init(&q->queue);
    420   q->queue_lock = GPR_SPINLOCK_INITIALIZER;
    421   gpr_atm_no_barrier_store(&q->num_queue_items, 0);
    422 }
    423 
    424 static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
    425   gpr_mpscq_destroy(&q->queue);
    426 }
    427 
    428 static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
    429   gpr_mpscq_push(&q->queue, reinterpret_cast<gpr_mpscq_node*>(c));
    430   return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
    431 }
    432 
    433 static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
    434   grpc_cq_completion* c = nullptr;
    435 
    436   if (gpr_spinlock_trylock(&q->queue_lock)) {
    437     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
    438 
    439     bool is_empty = false;
    440     c = reinterpret_cast<grpc_cq_completion*>(
    441         gpr_mpscq_pop_and_check_end(&q->queue, &is_empty));
    442     gpr_spinlock_unlock(&q->queue_lock);
    443 
    444     if (c == nullptr && !is_empty) {
    445       GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
    446     }
    447   } else {
    448     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
    449   }
    450 
    451   if (c) {
    452     gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1);
    453   }
    454 
    455   return c;
    456 }
    457 
    458 /* Note: The counter is not incremented/decremented atomically with push/pop.
    459  * The count is only eventually consistent */
    460 static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
    461   return static_cast<long>(gpr_atm_no_barrier_load(&q->num_queue_items));
    462 }
    463 
    464 grpc_completion_queue* grpc_completion_queue_create_internal(
    465     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
    466     grpc_experimental_completion_queue_functor* shutdown_callback) {
    467   GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
    468 
    469   grpc_completion_queue* cq;
    470 
    471   GRPC_API_TRACE(
    472       "grpc_completion_queue_create_internal(completion_type=%d, "
    473       "polling_type=%d)",
    474       2, (completion_type, polling_type));
    475 
    476   const cq_vtable* vtable = &g_cq_vtable[completion_type];
    477   const cq_poller_vtable* poller_vtable =
    478       &g_poller_vtable_by_poller_type[polling_type];
    479 
    480   grpc_core::ExecCtx exec_ctx;
    481   GRPC_STATS_INC_CQS_CREATED();
    482 
    483   cq = static_cast<grpc_completion_queue*>(
    484       gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
    485                  poller_vtable->size()));
    486 
    487   cq->vtable = vtable;
    488   cq->poller_vtable = poller_vtable;
    489 
    490   /* One for destroy(), one for pollset_shutdown */
    491   gpr_ref_init(&cq->owning_refs, 2);
    492 
    493   poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
    494   vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
    495 
    496   GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
    497                     grpc_schedule_on_exec_ctx);
    498   return cq;
    499 }
    500 
    501 static void cq_init_next(
    502     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
    503   cq_next_data* cqd = static_cast<cq_next_data*>(data);
    504   /* Initial count is dropped by grpc_completion_queue_shutdown */
    505   gpr_atm_no_barrier_store(&cqd->pending_events, 1);
    506   cqd->shutdown_called = false;
    507   gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
    508   cq_event_queue_init(&cqd->queue);
    509 }
    510 
    511 static void cq_destroy_next(void* data) {
    512   cq_next_data* cqd = static_cast<cq_next_data*>(data);
    513   GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
    514   cq_event_queue_destroy(&cqd->queue);
    515 }
    516 
    517 static void cq_init_pluck(
    518     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
    519   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
    520   /* Initial count is dropped by grpc_completion_queue_shutdown */
    521   gpr_atm_no_barrier_store(&cqd->pending_events, 1);
    522   cqd->completed_tail = &cqd->completed_head;
    523   cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
    524   gpr_atm_no_barrier_store(&cqd->shutdown, 0);
    525   cqd->shutdown_called = false;
    526   cqd->num_pluckers = 0;
    527   gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
    528 }
    529 
    530 static void cq_destroy_pluck(void* data) {
    531   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
    532   GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
    533 }
    534 
    535 static void cq_init_callback(
    536     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
    537   cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
    538   /* Initial count is dropped by grpc_completion_queue_shutdown */
    539   gpr_atm_no_barrier_store(&cqd->pending_events, 1);
    540   cqd->shutdown_called = false;
    541   gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
    542   cqd->shutdown_callback = shutdown_callback;
    543 }
    544 
    545 static void cq_destroy_callback(void* data) {}
    546 
    547 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
    548   return cq->vtable->cq_completion_type;
    549 }
    550 
    551 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
    552   int cur_num_polls;
    553   gpr_mu_lock(cq->mu);
    554   cur_num_polls = cq->num_polls;
    555   gpr_mu_unlock(cq->mu);
    556   return cur_num_polls;
    557 }
    558 
    559 #ifndef NDEBUG
    560 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
    561                           const char* file, int line) {
    562   if (grpc_trace_cq_refcount.enabled()) {
    563     gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
    564     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
    565             "CQ:%p   ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
    566             reason);
    567   }
    568 #else
    569 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
    570 #endif
    571   gpr_ref(&cq->owning_refs);
    572 }
    573 
    574 static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
    575   grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
    576   GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
    577 }
    578 
    579 #ifndef NDEBUG
    580 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
    581                             const char* file, int line) {
    582   if (grpc_trace_cq_refcount.enabled()) {
    583     gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
    584     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
    585             "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
    586             reason);
    587   }
    588 #else
    589 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
    590 #endif
    591   if (gpr_unref(&cq->owning_refs)) {
    592     cq->vtable->destroy(DATA_FROM_CQ(cq));
    593     cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
    594 #ifndef NDEBUG
    595     gpr_free(cq->outstanding_tags);
    596 #endif
    597     gpr_free(cq);
    598   }
    599 }
    600 
    601 #ifndef NDEBUG
    602 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
    603   int found = 0;
    604   if (lock_cq) {
    605     gpr_mu_lock(cq->mu);
    606   }
    607 
    608   for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
    609     if (cq->outstanding_tags[i] == tag) {
    610       cq->outstanding_tag_count--;
    611       GPR_SWAP(void*, cq->outstanding_tags[i],
    612                cq->outstanding_tags[cq->outstanding_tag_count]);
    613       found = 1;
    614       break;
    615     }
    616   }
    617 
    618   if (lock_cq) {
    619     gpr_mu_unlock(cq->mu);
    620   }
    621 
    622   GPR_ASSERT(found);
    623 }
    624 #else
    625 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
    626 #endif
    627 
    628 /* Atomically increments a counter only if the counter is not zero. Returns
    629  * true if the increment was successful; false if the counter is zero */
    630 static bool atm_inc_if_nonzero(gpr_atm* counter) {
    631   while (true) {
    632     gpr_atm count = gpr_atm_acq_load(counter);
    633     /* If zero, we are done. If not, we must to a CAS (instead of an atomic
    634      * increment) to maintain the contract: do not increment the counter if it
    635      * is zero. */
    636     if (count == 0) {
    637       return false;
    638     } else if (gpr_atm_full_cas(counter, count, count + 1)) {
    639       break;
    640     }
    641   }
    642 
    643   return true;
    644 }
    645 
    646 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
    647   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
    648   return atm_inc_if_nonzero(&cqd->pending_events);
    649 }
    650 
    651 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
    652   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
    653   return atm_inc_if_nonzero(&cqd->pending_events);
    654 }
    655 
    656 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
    657   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
    658   return atm_inc_if_nonzero(&cqd->pending_events);
    659 }
    660 
    661 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
    662 #ifndef NDEBUG
    663   gpr_mu_lock(cq->mu);
    664   if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
    665     cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
    666     cq->outstanding_tags = static_cast<void**>(gpr_realloc(
    667         cq->outstanding_tags,
    668         sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
    669   }
    670   cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
    671   gpr_mu_unlock(cq->mu);
    672 #endif
    673   return cq->vtable->begin_op(cq, tag);
    674 }
    675 
    676 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
    677  * completion
    678  * type of GRPC_CQ_NEXT) */
    679 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
    680                                grpc_error* error,
    681                                void (*done)(void* done_arg,
    682                                             grpc_cq_completion* storage),
    683                                void* done_arg, grpc_cq_completion* storage) {
    684   GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
    685 
    686   if (grpc_api_trace.enabled() ||
    687       (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
    688     const char* errmsg = grpc_error_string(error);
    689     GRPC_API_TRACE(
    690         "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
    691         "done=%p, done_arg=%p, storage=%p)",
    692         6, (cq, tag, errmsg, done, done_arg, storage));
    693     if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
    694       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
    695     }
    696   }
    697   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
    698   int is_success = (error == GRPC_ERROR_NONE);
    699 
    700   storage->tag = tag;
    701   storage->done = done;
    702   storage->done_arg = done_arg;
    703   storage->next = static_cast<uintptr_t>(is_success);
    704 
    705   cq_check_tag(cq, tag, true); /* Used in debug builds only */
    706 
    707   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
    708       (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
    709     gpr_tls_set(&g_cached_event, (intptr_t)storage);
    710   } else {
    711     /* Add the completion to the queue */
    712     bool is_first = cq_event_queue_push(&cqd->queue, storage);
    713     gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
    714 
    715     /* Since we do not hold the cq lock here, it is important to do an 'acquire'
    716        load here (instead of a 'no_barrier' load) to match with the release
    717        store
    718        (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
    719        */
    720     bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
    721 
    722     if (!will_definitely_shutdown) {
    723       /* Only kick if this is the first item queued */
    724       if (is_first) {
    725         gpr_mu_lock(cq->mu);
    726         grpc_error* kick_error =
    727             cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
    728         gpr_mu_unlock(cq->mu);
    729 
    730         if (kick_error != GRPC_ERROR_NONE) {
    731           const char* msg = grpc_error_string(kick_error);
    732           gpr_log(GPR_ERROR, "Kick failed: %s", msg);
    733           GRPC_ERROR_UNREF(kick_error);
    734         }
    735       }
    736       if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
    737         GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
    738         gpr_mu_lock(cq->mu);
    739         cq_finish_shutdown_next(cq);
    740         gpr_mu_unlock(cq->mu);
    741         GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
    742       }
    743     } else {
    744       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
    745       gpr_atm_rel_store(&cqd->pending_events, 0);
    746       gpr_mu_lock(cq->mu);
    747       cq_finish_shutdown_next(cq);
    748       gpr_mu_unlock(cq->mu);
    749       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
    750     }
    751   }
    752 
    753   GRPC_ERROR_UNREF(error);
    754 }
    755 
    756 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
    757  * completion
    758  * type of GRPC_CQ_PLUCK) */
    759 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
    760                                 grpc_error* error,
    761                                 void (*done)(void* done_arg,
    762                                              grpc_cq_completion* storage),
    763                                 void* done_arg, grpc_cq_completion* storage) {
    764   GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
    765 
    766   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
    767   int is_success = (error == GRPC_ERROR_NONE);
    768 
    769   if (grpc_api_trace.enabled() ||
    770       (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
    771     const char* errmsg = grpc_error_string(error);
    772     GRPC_API_TRACE(
    773         "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
    774         "done=%p, done_arg=%p, storage=%p)",
    775         6, (cq, tag, errmsg, done, done_arg, storage));
    776     if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
    777       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
    778     }
    779   }
    780 
    781   storage->tag = tag;
    782   storage->done = done;
    783   storage->done_arg = done_arg;
    784   storage->next =
    785       ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
    786 
    787   gpr_mu_lock(cq->mu);
    788   cq_check_tag(cq, tag, false); /* Used in debug builds only */
    789 
    790   /* Add to the list of completions */
    791   gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
    792   cqd->completed_tail->next =
    793       ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
    794   cqd->completed_tail = storage;
    795 
    796   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
    797     cq_finish_shutdown_pluck(cq);
    798     gpr_mu_unlock(cq->mu);
    799   } else {
    800     grpc_pollset_worker* pluck_worker = nullptr;
    801     for (int i = 0; i < cqd->num_pluckers; i++) {
    802       if (cqd->pluckers[i].tag == tag) {
    803         pluck_worker = *cqd->pluckers[i].worker;
    804         break;
    805       }
    806     }
    807 
    808     grpc_error* kick_error =
    809         cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
    810 
    811     gpr_mu_unlock(cq->mu);
    812 
    813     if (kick_error != GRPC_ERROR_NONE) {
    814       const char* msg = grpc_error_string(kick_error);
    815       gpr_log(GPR_ERROR, "Kick failed: %s", msg);
    816 
    817       GRPC_ERROR_UNREF(kick_error);
    818     }
    819   }
    820 
    821   GRPC_ERROR_UNREF(error);
    822 }
    823 
    824 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
    825 static void cq_end_op_for_callback(
    826     grpc_completion_queue* cq, void* tag, grpc_error* error,
    827     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
    828     grpc_cq_completion* storage) {
    829   GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
    830 
    831   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
    832   bool is_success = (error == GRPC_ERROR_NONE);
    833 
    834   if (grpc_api_trace.enabled() ||
    835       (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) {
    836     const char* errmsg = grpc_error_string(error);
    837     GRPC_API_TRACE(
    838         "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
    839         "done=%p, done_arg=%p, storage=%p)",
    840         6, (cq, tag, errmsg, done, done_arg, storage));
    841     if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) {
    842       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
    843     }
    844   }
    845 
    846   // The callback-based CQ isn't really a queue at all and thus has no need
    847   // for reserved storage. Invoke the done callback right away to release it.
    848   done(done_arg, storage);
    849 
    850   gpr_mu_lock(cq->mu);
    851   cq_check_tag(cq, tag, false); /* Used in debug builds only */
    852 
    853   gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
    854   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
    855     cq_finish_shutdown_callback(cq);
    856     gpr_mu_unlock(cq->mu);
    857   } else {
    858     gpr_mu_unlock(cq->mu);
    859   }
    860 
    861   GRPC_ERROR_UNREF(error);
    862 
    863   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
    864   (*functor->functor_run)(functor, is_success);
    865 }
    866 
    867 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
    868                     void (*done)(void* done_arg, grpc_cq_completion* storage),
    869                     void* done_arg, grpc_cq_completion* storage) {
    870   cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
    871 }
    872 
    873 typedef struct {
    874   gpr_atm last_seen_things_queued_ever;
    875   grpc_completion_queue* cq;
    876   grpc_millis deadline;
    877   grpc_cq_completion* stolen_completion;
    878   void* tag; /* for pluck */
    879   bool first_loop;
    880 } cq_is_finished_arg;
    881 
    882 class ExecCtxNext : public grpc_core::ExecCtx {
    883  public:
    884   ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
    885 
    886   bool CheckReadyToFinish() override {
    887     cq_is_finished_arg* a =
    888         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
    889     grpc_completion_queue* cq = a->cq;
    890     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
    891     GPR_ASSERT(a->stolen_completion == nullptr);
    892 
    893     gpr_atm current_last_seen_things_queued_ever =
    894         gpr_atm_no_barrier_load(&cqd->things_queued_ever);
    895 
    896     if (current_last_seen_things_queued_ever !=
    897         a->last_seen_things_queued_ever) {
    898       a->last_seen_things_queued_ever =
    899           gpr_atm_no_barrier_load(&cqd->things_queued_ever);
    900 
    901       /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
    902        * might return NULL in some cases even if the queue is not empty; but
    903        * that
    904        * is ok and doesn't affect correctness. Might effect the tail latencies a
    905        * bit) */
    906       a->stolen_completion = cq_event_queue_pop(&cqd->queue);
    907       if (a->stolen_completion != nullptr) {
    908         return true;
    909       }
    910     }
    911     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
    912   }
    913 
    914  private:
    915   void* check_ready_to_finish_arg_;
    916 };
    917 
    918 #ifndef NDEBUG
    919 static void dump_pending_tags(grpc_completion_queue* cq) {
    920   if (!grpc_trace_pending_tags.enabled()) return;
    921 
    922   gpr_strvec v;
    923   gpr_strvec_init(&v);
    924   gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
    925   gpr_mu_lock(cq->mu);
    926   for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
    927     char* s;
    928     gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
    929     gpr_strvec_add(&v, s);
    930   }
    931   gpr_mu_unlock(cq->mu);
    932   char* out = gpr_strvec_flatten(&v, nullptr);
    933   gpr_strvec_destroy(&v);
    934   gpr_log(GPR_DEBUG, "%s", out);
    935   gpr_free(out);
    936 }
    937 #else
    938 static void dump_pending_tags(grpc_completion_queue* cq) {}
    939 #endif
    940 
    941 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
    942                           void* reserved) {
    943   GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
    944 
    945   grpc_event ret;
    946   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
    947 
    948   GRPC_API_TRACE(
    949       "grpc_completion_queue_next("
    950       "cq=%p, "
    951       "deadline=gpr_timespec { tv_sec: %" PRId64
    952       ", tv_nsec: %d, clock_type: %d }, "
    953       "reserved=%p)",
    954       5,
    955       (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
    956        reserved));
    957   GPR_ASSERT(!reserved);
    958 
    959   dump_pending_tags(cq);
    960 
    961   GRPC_CQ_INTERNAL_REF(cq, "next");
    962 
    963   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
    964   cq_is_finished_arg is_finished_arg = {
    965       gpr_atm_no_barrier_load(&cqd->things_queued_ever),
    966       cq,
    967       deadline_millis,
    968       nullptr,
    969       nullptr,
    970       true};
    971   ExecCtxNext exec_ctx(&is_finished_arg);
    972   for (;;) {
    973     grpc_millis iteration_deadline = deadline_millis;
    974 
    975     if (is_finished_arg.stolen_completion != nullptr) {
    976       grpc_cq_completion* c = is_finished_arg.stolen_completion;
    977       is_finished_arg.stolen_completion = nullptr;
    978       ret.type = GRPC_OP_COMPLETE;
    979       ret.success = c->next & 1u;
    980       ret.tag = c->tag;
    981       c->done(c->done_arg, c);
    982       break;
    983     }
    984 
    985     grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
    986 
    987     if (c != nullptr) {
    988       ret.type = GRPC_OP_COMPLETE;
    989       ret.success = c->next & 1u;
    990       ret.tag = c->tag;
    991       c->done(c->done_arg, c);
    992       break;
    993     } else {
    994       /* If c == NULL it means either the queue is empty OR in an transient
    995          inconsistent state. If it is the latter, we shold do a 0-timeout poll
    996          so that the thread comes back quickly from poll to make a second
    997          attempt at popping. Not doing this can potentially deadlock this
    998          thread forever (if the deadline is infinity) */
    999       if (cq_event_queue_num_items(&cqd->queue) > 0) {
   1000         iteration_deadline = 0;
   1001       }
   1002     }
   1003 
   1004     if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
   1005       /* Before returning, check if the queue has any items left over (since
   1006          gpr_mpscq_pop() can sometimes return NULL even if the queue is not
   1007          empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
   1008       if (cq_event_queue_num_items(&cqd->queue) > 0) {
   1009         /* Go to the beginning of the loop. No point doing a poll because
   1010            (cq->shutdown == true) is only possible when there is no pending
   1011            work (i.e cq->pending_events == 0) and any outstanding completion
   1012            events should have already been queued on this cq */
   1013         continue;
   1014       }
   1015 
   1016       memset(&ret, 0, sizeof(ret));
   1017       ret.type = GRPC_QUEUE_SHUTDOWN;
   1018       break;
   1019     }
   1020 
   1021     if (!is_finished_arg.first_loop &&
   1022         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
   1023       memset(&ret, 0, sizeof(ret));
   1024       ret.type = GRPC_QUEUE_TIMEOUT;
   1025       dump_pending_tags(cq);
   1026       break;
   1027     }
   1028 
   1029     /* The main polling work happens in grpc_pollset_work */
   1030     gpr_mu_lock(cq->mu);
   1031     cq->num_polls++;
   1032     grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
   1033                                               iteration_deadline);
   1034     gpr_mu_unlock(cq->mu);
   1035 
   1036     if (err != GRPC_ERROR_NONE) {
   1037       const char* msg = grpc_error_string(err);
   1038       gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
   1039 
   1040       GRPC_ERROR_UNREF(err);
   1041       memset(&ret, 0, sizeof(ret));
   1042       ret.type = GRPC_QUEUE_TIMEOUT;
   1043       dump_pending_tags(cq);
   1044       break;
   1045     }
   1046     is_finished_arg.first_loop = false;
   1047   }
   1048 
   1049   if (cq_event_queue_num_items(&cqd->queue) > 0 &&
   1050       gpr_atm_acq_load(&cqd->pending_events) > 0) {
   1051     gpr_mu_lock(cq->mu);
   1052     cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
   1053     gpr_mu_unlock(cq->mu);
   1054   }
   1055 
   1056   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
   1057   GRPC_CQ_INTERNAL_UNREF(cq, "next");
   1058 
   1059   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
   1060 
   1061   return ret;
   1062 }
   1063 
   1064 /* Finishes the completion queue shutdown. This means that there are no more
   1065    completion events / tags expected from the completion queue
   1066    - Must be called under completion queue lock
   1067    - Must be called only once in completion queue's lifetime
   1068    - grpc_completion_queue_shutdown() MUST have been called before calling
   1069    this function */
   1070 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
   1071   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
   1072 
   1073   GPR_ASSERT(cqd->shutdown_called);
   1074   GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
   1075 
   1076   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
   1077 }
   1078 
   1079 static void cq_shutdown_next(grpc_completion_queue* cq) {
   1080   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
   1081 
   1082   /* Need an extra ref for cq here because:
   1083    * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
   1084    * Pollset shutdown decrements the cq ref count which can potentially destroy
   1085    * the cq (if that happens to be the last ref).
   1086    * Creating an extra ref here prevents the cq from getting destroyed while
   1087    * this function is still active */
   1088   GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
   1089   gpr_mu_lock(cq->mu);
   1090   if (cqd->shutdown_called) {
   1091     gpr_mu_unlock(cq->mu);
   1092     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
   1093     return;
   1094   }
   1095   cqd->shutdown_called = true;
   1096   /* Doing a full_fetch_add (i.e acq/release) here to match with
   1097    * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
   1098    * on this counter without necessarily holding a lock on cq */
   1099   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
   1100     cq_finish_shutdown_next(cq);
   1101   }
   1102   gpr_mu_unlock(cq->mu);
   1103   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
   1104 }
   1105 
   1106 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
   1107                                       gpr_timespec deadline, void* reserved) {
   1108   return cq->vtable->next(cq, deadline, reserved);
   1109 }
   1110 
   1111 static int add_plucker(grpc_completion_queue* cq, void* tag,
   1112                        grpc_pollset_worker** worker) {
   1113   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
   1114   if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
   1115     return 0;
   1116   }
   1117   cqd->pluckers[cqd->num_pluckers].tag = tag;
   1118   cqd->pluckers[cqd->num_pluckers].worker = worker;
   1119   cqd->num_pluckers++;
   1120   return 1;
   1121 }
   1122 
   1123 static void del_plucker(grpc_completion_queue* cq, void* tag,
   1124                         grpc_pollset_worker** worker) {
   1125   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
   1126   for (int i = 0; i < cqd->num_pluckers; i++) {
   1127     if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
   1128       cqd->num_pluckers--;
   1129       GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
   1130       return;
   1131     }
   1132   }
   1133   GPR_UNREACHABLE_CODE(return );
   1134 }
   1135 
   1136 class ExecCtxPluck : public grpc_core::ExecCtx {
   1137  public:
   1138   ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
   1139 
   1140   bool CheckReadyToFinish() override {
   1141     cq_is_finished_arg* a =
   1142         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
   1143     grpc_completion_queue* cq = a->cq;
   1144     cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
   1145 
   1146     GPR_ASSERT(a->stolen_completion == nullptr);
   1147     gpr_atm current_last_seen_things_queued_ever =
   1148         gpr_atm_no_barrier_load(&cqd->things_queued_ever);
   1149     if (current_last_seen_things_queued_ever !=
   1150         a->last_seen_things_queued_ever) {
   1151       gpr_mu_lock(cq->mu);
   1152       a->last_seen_things_queued_ever =
   1153           gpr_atm_no_barrier_load(&cqd->things_queued_ever);
   1154       grpc_cq_completion* c;
   1155       grpc_cq_completion* prev = &cqd->completed_head;
   1156       while ((c = (grpc_cq_completion*)(prev->next &
   1157                                         ~static_cast<uintptr_t>(1))) !=
   1158              &cqd->completed_head) {
   1159         if (c->tag == a->tag) {
   1160           prev->next = (prev->next & static_cast<uintptr_t>(1)) |
   1161                        (c->next & ~static_cast<uintptr_t>(1));
   1162           if (c == cqd->completed_tail) {
   1163             cqd->completed_tail = prev;
   1164           }
   1165           gpr_mu_unlock(cq->mu);
   1166           a->stolen_completion = c;
   1167           return true;
   1168         }
   1169         prev = c;
   1170       }
   1171       gpr_mu_unlock(cq->mu);
   1172     }
   1173     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
   1174   }
   1175 
   1176  private:
   1177   void* check_ready_to_finish_arg_;
   1178 };
   1179 
   1180 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
   1181                            gpr_timespec deadline, void* reserved) {
   1182   GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
   1183 
   1184   grpc_event ret;
   1185   grpc_cq_completion* c;
   1186   grpc_cq_completion* prev;
   1187   grpc_pollset_worker* worker = nullptr;
   1188   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
   1189 
   1190   if (grpc_cq_pluck_trace.enabled()) {
   1191     GRPC_API_TRACE(
   1192         "grpc_completion_queue_pluck("
   1193         "cq=%p, tag=%p, "
   1194         "deadline=gpr_timespec { tv_sec: %" PRId64
   1195         ", tv_nsec: %d, clock_type: %d }, "
   1196         "reserved=%p)",
   1197         6,
   1198         (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
   1199          reserved));
   1200   }
   1201   GPR_ASSERT(!reserved);
   1202 
   1203   dump_pending_tags(cq);
   1204 
   1205   GRPC_CQ_INTERNAL_REF(cq, "pluck");
   1206   gpr_mu_lock(cq->mu);
   1207   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
   1208   cq_is_finished_arg is_finished_arg = {
   1209       gpr_atm_no_barrier_load(&cqd->things_queued_ever),
   1210       cq,
   1211       deadline_millis,
   1212       nullptr,
   1213       tag,
   1214       true};
   1215   ExecCtxPluck exec_ctx(&is_finished_arg);
   1216   for (;;) {
   1217     if (is_finished_arg.stolen_completion != nullptr) {
   1218       gpr_mu_unlock(cq->mu);
   1219       c = is_finished_arg.stolen_completion;
   1220       is_finished_arg.stolen_completion = nullptr;
   1221       ret.type = GRPC_OP_COMPLETE;
   1222       ret.success = c->next & 1u;
   1223       ret.tag = c->tag;
   1224       c->done(c->done_arg, c);
   1225       break;
   1226     }
   1227     prev = &cqd->completed_head;
   1228     while (
   1229         (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
   1230         &cqd->completed_head) {
   1231       if (c->tag == tag) {
   1232         prev->next = (prev->next & static_cast<uintptr_t>(1)) |
   1233                      (c->next & ~static_cast<uintptr_t>(1));
   1234         if (c == cqd->completed_tail) {
   1235           cqd->completed_tail = prev;
   1236         }
   1237         gpr_mu_unlock(cq->mu);
   1238         ret.type = GRPC_OP_COMPLETE;
   1239         ret.success = c->next & 1u;
   1240         ret.tag = c->tag;
   1241         c->done(c->done_arg, c);
   1242         goto done;
   1243       }
   1244       prev = c;
   1245     }
   1246     if (gpr_atm_no_barrier_load(&cqd->shutdown)) {
   1247       gpr_mu_unlock(cq->mu);
   1248       memset(&ret, 0, sizeof(ret));
   1249       ret.type = GRPC_QUEUE_SHUTDOWN;
   1250       break;
   1251     }
   1252     if (!add_plucker(cq, tag, &worker)) {
   1253       gpr_log(GPR_DEBUG,
   1254               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
   1255               "is %d",
   1256               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
   1257       gpr_mu_unlock(cq->mu);
   1258       memset(&ret, 0, sizeof(ret));
   1259       /* TODO(ctiller): should we use a different result here */
   1260       ret.type = GRPC_QUEUE_TIMEOUT;
   1261       dump_pending_tags(cq);
   1262       break;
   1263     }
   1264     if (!is_finished_arg.first_loop &&
   1265         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
   1266       del_plucker(cq, tag, &worker);
   1267       gpr_mu_unlock(cq->mu);
   1268       memset(&ret, 0, sizeof(ret));
   1269       ret.type = GRPC_QUEUE_TIMEOUT;
   1270       dump_pending_tags(cq);
   1271       break;
   1272     }
   1273     cq->num_polls++;
   1274     grpc_error* err =
   1275         cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
   1276     if (err != GRPC_ERROR_NONE) {
   1277       del_plucker(cq, tag, &worker);
   1278       gpr_mu_unlock(cq->mu);
   1279       const char* msg = grpc_error_string(err);
   1280       gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
   1281 
   1282       GRPC_ERROR_UNREF(err);
   1283       memset(&ret, 0, sizeof(ret));
   1284       ret.type = GRPC_QUEUE_TIMEOUT;
   1285       dump_pending_tags(cq);
   1286       break;
   1287     }
   1288     is_finished_arg.first_loop = false;
   1289     del_plucker(cq, tag, &worker);
   1290   }
   1291 done:
   1292   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
   1293   GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
   1294 
   1295   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
   1296 
   1297   return ret;
   1298 }
   1299 
   1300 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
   1301                                        gpr_timespec deadline, void* reserved) {
   1302   return cq->vtable->pluck(cq, tag, deadline, reserved);
   1303 }
   1304 
   1305 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
   1306   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
   1307 
   1308   GPR_ASSERT(cqd->shutdown_called);
   1309   GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
   1310   gpr_atm_no_barrier_store(&cqd->shutdown, 1);
   1311 
   1312   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
   1313 }
   1314 
   1315 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
   1316  * merging them is a bit tricky and probably not worth it */
   1317 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
   1318   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
   1319 
   1320   /* Need an extra ref for cq here because:
   1321    * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
   1322    * Pollset shutdown decrements the cq ref count which can potentially destroy
   1323    * the cq (if that happens to be the last ref).
   1324    * Creating an extra ref here prevents the cq from getting destroyed while
   1325    * this function is still active */
   1326   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
   1327   gpr_mu_lock(cq->mu);
   1328   if (cqd->shutdown_called) {
   1329     gpr_mu_unlock(cq->mu);
   1330     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
   1331     return;
   1332   }
   1333   cqd->shutdown_called = true;
   1334   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
   1335     cq_finish_shutdown_pluck(cq);
   1336   }
   1337   gpr_mu_unlock(cq->mu);
   1338   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
   1339 }
   1340 
   1341 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
   1342   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
   1343   auto* callback = cqd->shutdown_callback;
   1344 
   1345   GPR_ASSERT(cqd->shutdown_called);
   1346 
   1347   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
   1348   (*callback->functor_run)(callback, true);
   1349 }
   1350 
   1351 static void cq_shutdown_callback(grpc_completion_queue* cq) {
   1352   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
   1353 
   1354   /* Need an extra ref for cq here because:
   1355    * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
   1356    * Pollset shutdown decrements the cq ref count which can potentially destroy
   1357    * the cq (if that happens to be the last ref).
   1358    * Creating an extra ref here prevents the cq from getting destroyed while
   1359    * this function is still active */
   1360   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
   1361   gpr_mu_lock(cq->mu);
   1362   if (cqd->shutdown_called) {
   1363     gpr_mu_unlock(cq->mu);
   1364     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
   1365     return;
   1366   }
   1367   cqd->shutdown_called = true;
   1368   if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
   1369     gpr_mu_unlock(cq->mu);
   1370     cq_finish_shutdown_callback(cq);
   1371   } else {
   1372     gpr_mu_unlock(cq->mu);
   1373   }
   1374   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
   1375 }
   1376 
   1377 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
   1378    to zero here, then enter shutdown mode and wake up any waiters */
   1379 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
   1380   GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
   1381   grpc_core::ExecCtx exec_ctx;
   1382   GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
   1383   cq->vtable->shutdown(cq);
   1384 }
   1385 
   1386 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
   1387   GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
   1388   GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
   1389   grpc_completion_queue_shutdown(cq);
   1390 
   1391   grpc_core::ExecCtx exec_ctx;
   1392   GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
   1393 }
   1394 
   1395 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
   1396   return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
   1397 }
   1398 
   1399 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
   1400   return cq->poller_vtable->can_listen;
   1401 }
   1402