1 /* 2 * 3 * Copyright 2015-2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 #include <grpc/support/port_platform.h> 19 20 #include "src/core/lib/surface/completion_queue.h" 21 22 #include <inttypes.h> 23 #include <stdio.h> 24 #include <string.h> 25 26 #include <grpc/support/alloc.h> 27 #include <grpc/support/atm.h> 28 #include <grpc/support/log.h> 29 #include <grpc/support/string_util.h> 30 #include <grpc/support/time.h> 31 32 #include "src/core/lib/debug/stats.h" 33 #include "src/core/lib/gpr/spinlock.h" 34 #include "src/core/lib/gpr/string.h" 35 #include "src/core/lib/gpr/tls.h" 36 #include "src/core/lib/iomgr/pollset.h" 37 #include "src/core/lib/iomgr/timer.h" 38 #include "src/core/lib/profiling/timers.h" 39 #include "src/core/lib/surface/api_trace.h" 40 #include "src/core/lib/surface/call.h" 41 #include "src/core/lib/surface/event_string.h" 42 43 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure"); 44 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags"); 45 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount"); 46 47 // Specifies a cq thread local cache. 48 // The first event that occurs on a thread 49 // with a cq cache will go into that cache, and 50 // will only be returned on the thread that initialized the cache. 51 // NOTE: Only one event will ever be cached. 52 GPR_TLS_DECL(g_cached_event); 53 GPR_TLS_DECL(g_cached_cq); 54 55 typedef struct { 56 grpc_pollset_worker** worker; 57 void* tag; 58 } plucker; 59 60 typedef struct { 61 bool can_get_pollset; 62 bool can_listen; 63 size_t (*size)(void); 64 void (*init)(grpc_pollset* pollset, gpr_mu** mu); 65 grpc_error* (*kick)(grpc_pollset* pollset, 66 grpc_pollset_worker* specific_worker); 67 grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker, 68 grpc_millis deadline); 69 void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure); 70 void (*destroy)(grpc_pollset* pollset); 71 } cq_poller_vtable; 72 73 typedef struct non_polling_worker { 74 gpr_cv cv; 75 bool kicked; 76 struct non_polling_worker* next; 77 struct non_polling_worker* prev; 78 } non_polling_worker; 79 80 typedef struct { 81 gpr_mu mu; 82 non_polling_worker* root; 83 grpc_closure* shutdown; 84 } non_polling_poller; 85 86 static size_t non_polling_poller_size(void) { 87 return sizeof(non_polling_poller); 88 } 89 90 static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) { 91 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset); 92 gpr_mu_init(&npp->mu); 93 *mu = &npp->mu; 94 } 95 96 static void non_polling_poller_destroy(grpc_pollset* pollset) { 97 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset); 98 gpr_mu_destroy(&npp->mu); 99 } 100 101 static grpc_error* non_polling_poller_work(grpc_pollset* pollset, 102 grpc_pollset_worker** worker, 103 grpc_millis deadline) { 104 non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset); 105 if (npp->shutdown) return GRPC_ERROR_NONE; 106 non_polling_worker w; 107 gpr_cv_init(&w.cv); 108 if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w); 109 if (npp->root == nullptr) { 110 npp->root = w.next = w.prev = &w; 111 } else { 112 w.next = npp->root; 113 w.prev = w.next->prev; 114 w.next->prev = w.prev->next = &w; 115 } 116 w.kicked = false; 117 gpr_timespec deadline_ts = 118 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC); 119 while (!npp->shutdown && !w.kicked && 120 !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts)) 121 ; 122 grpc_core::ExecCtx::Get()->InvalidateNow(); 123 if (&w == npp->root) { 124 npp->root = w.next; 125 if (&w == npp->root) { 126 if (npp->shutdown) { 127 GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE); 128 } 129 npp->root = nullptr; 130 } 131 } 132 w.next->prev = w.prev; 133 w.prev->next = w.next; 134 gpr_cv_destroy(&w.cv); 135 if (worker != nullptr) *worker = nullptr; 136 return GRPC_ERROR_NONE; 137 } 138 139 static grpc_error* non_polling_poller_kick( 140 grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { 141 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset); 142 if (specific_worker == nullptr) 143 specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root); 144 if (specific_worker != nullptr) { 145 non_polling_worker* w = 146 reinterpret_cast<non_polling_worker*>(specific_worker); 147 if (!w->kicked) { 148 w->kicked = true; 149 gpr_cv_signal(&w->cv); 150 } 151 } 152 return GRPC_ERROR_NONE; 153 } 154 155 static void non_polling_poller_shutdown(grpc_pollset* pollset, 156 grpc_closure* closure) { 157 non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset); 158 GPR_ASSERT(closure != nullptr); 159 p->shutdown = closure; 160 if (p->root == nullptr) { 161 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); 162 } else { 163 non_polling_worker* w = p->root; 164 do { 165 gpr_cv_signal(&w->cv); 166 w = w->next; 167 } while (w != p->root); 168 } 169 } 170 171 static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { 172 /* GRPC_CQ_DEFAULT_POLLING */ 173 {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick, 174 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy}, 175 /* GRPC_CQ_NON_LISTENING */ 176 {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick, 177 grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy}, 178 /* GRPC_CQ_NON_POLLING */ 179 {false, false, non_polling_poller_size, non_polling_poller_init, 180 non_polling_poller_kick, non_polling_poller_work, 181 non_polling_poller_shutdown, non_polling_poller_destroy}, 182 }; 183 184 typedef struct cq_vtable { 185 grpc_cq_completion_type cq_completion_type; 186 size_t data_size; 187 void (*init)(void* data, 188 grpc_experimental_completion_queue_functor* shutdown_callback); 189 void (*shutdown)(grpc_completion_queue* cq); 190 void (*destroy)(void* data); 191 bool (*begin_op)(grpc_completion_queue* cq, void* tag); 192 void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, 193 void (*done)(void* done_arg, grpc_cq_completion* storage), 194 void* done_arg, grpc_cq_completion* storage); 195 grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, 196 void* reserved); 197 grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, 198 gpr_timespec deadline, void* reserved); 199 } cq_vtable; 200 201 /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue 202 * (a lockfree multiproducer single consumer queue). It uses a queue_lock 203 * to support multiple consumers. 204 * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */ 205 typedef struct grpc_cq_event_queue { 206 /* Spinlock to serialize consumers i.e pop() operations */ 207 gpr_spinlock queue_lock; 208 209 gpr_mpscq queue; 210 211 /* A lazy counter of number of items in the queue. This is NOT atomically 212 incremented/decremented along with push/pop operations and hence is only 213 eventually consistent */ 214 gpr_atm num_queue_items; 215 } grpc_cq_event_queue; 216 217 typedef struct cq_next_data { 218 /** Completed events for completion-queues of type GRPC_CQ_NEXT */ 219 grpc_cq_event_queue queue; 220 221 /** Counter of how many things have ever been queued on this completion queue 222 useful for avoiding locks to check the queue */ 223 gpr_atm things_queued_ever; 224 225 /* Number of outstanding events (+1 if not shut down) */ 226 gpr_atm pending_events; 227 228 /** 0 initially. 1 once we initiated shutdown */ 229 bool shutdown_called; 230 } cq_next_data; 231 232 typedef struct cq_pluck_data { 233 /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ 234 grpc_cq_completion completed_head; 235 grpc_cq_completion* completed_tail; 236 237 /** Number of pending events (+1 if we're not shutdown) */ 238 gpr_atm pending_events; 239 240 /** Counter of how many things have ever been queued on this completion queue 241 useful for avoiding locks to check the queue */ 242 gpr_atm things_queued_ever; 243 244 /** 0 initially. 1 once we completed shutting */ 245 /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if 246 * (pending_events == 0). So consider removing this in future and use 247 * pending_events */ 248 gpr_atm shutdown; 249 250 /** 0 initially. 1 once we initiated shutdown */ 251 bool shutdown_called; 252 253 int num_pluckers; 254 plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; 255 } cq_pluck_data; 256 257 typedef struct cq_callback_data { 258 /** No actual completed events queue, unlike other types */ 259 260 /** Number of pending events (+1 if we're not shutdown) */ 261 gpr_atm pending_events; 262 263 /** Counter of how many things have ever been queued on this completion queue 264 useful for avoiding locks to check the queue */ 265 gpr_atm things_queued_ever; 266 267 /** 0 initially. 1 once we initiated shutdown */ 268 bool shutdown_called; 269 270 /** A callback that gets invoked when the CQ completes shutdown */ 271 grpc_experimental_completion_queue_functor* shutdown_callback; 272 } cq_callback_data; 273 274 /* Completion queue structure */ 275 struct grpc_completion_queue { 276 /** Once owning_refs drops to zero, we will destroy the cq */ 277 gpr_refcount owning_refs; 278 279 gpr_mu* mu; 280 281 const cq_vtable* vtable; 282 const cq_poller_vtable* poller_vtable; 283 284 #ifndef NDEBUG 285 void** outstanding_tags; 286 size_t outstanding_tag_count; 287 size_t outstanding_tag_capacity; 288 #endif 289 290 grpc_closure pollset_shutdown_done; 291 int num_polls; 292 }; 293 294 /* Forward declarations */ 295 static void cq_finish_shutdown_next(grpc_completion_queue* cq); 296 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq); 297 static void cq_finish_shutdown_callback(grpc_completion_queue* cq); 298 static void cq_shutdown_next(grpc_completion_queue* cq); 299 static void cq_shutdown_pluck(grpc_completion_queue* cq); 300 static void cq_shutdown_callback(grpc_completion_queue* cq); 301 302 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); 303 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); 304 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); 305 306 // A cq_end_op function is called when an operation on a given CQ with 307 // a given tag has completed. The storage argument is a reference to the 308 // space reserved for this completion as it is placed into the corresponding 309 // queue. The done argument is a callback that will be invoked when it is 310 // safe to free up that storage. The storage MUST NOT be freed until the 311 // done callback is invoked. 312 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, 313 grpc_error* error, 314 void (*done)(void* done_arg, 315 grpc_cq_completion* storage), 316 void* done_arg, grpc_cq_completion* storage); 317 318 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, 319 grpc_error* error, 320 void (*done)(void* done_arg, 321 grpc_cq_completion* storage), 322 void* done_arg, grpc_cq_completion* storage); 323 324 static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, 325 grpc_error* error, 326 void (*done)(void* done_arg, 327 grpc_cq_completion* storage), 328 void* done_arg, grpc_cq_completion* storage); 329 330 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, 331 void* reserved); 332 333 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, 334 gpr_timespec deadline, void* reserved); 335 336 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback 337 static void cq_init_next( 338 void* data, grpc_experimental_completion_queue_functor* shutdown_callback); 339 static void cq_init_pluck( 340 void* data, grpc_experimental_completion_queue_functor* shutdown_callback); 341 static void cq_init_callback( 342 void* data, grpc_experimental_completion_queue_functor* shutdown_callback); 343 static void cq_destroy_next(void* data); 344 static void cq_destroy_pluck(void* data); 345 static void cq_destroy_callback(void* data); 346 347 /* Completion queue vtables based on the completion-type */ 348 static const cq_vtable g_cq_vtable[] = { 349 /* GRPC_CQ_NEXT */ 350 {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next, 351 cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, 352 nullptr}, 353 /* GRPC_CQ_PLUCK */ 354 {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck, 355 cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr, 356 cq_pluck}, 357 /* GRPC_CQ_CALLBACK */ 358 {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback, 359 cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback, 360 cq_end_op_for_callback, nullptr, nullptr}, 361 }; 362 363 #define DATA_FROM_CQ(cq) ((void*)(cq + 1)) 364 #define POLLSET_FROM_CQ(cq) \ 365 ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq))) 366 367 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); 368 369 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ 370 if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() || \ 371 (event)->type != GRPC_QUEUE_TIMEOUT)) { \ 372 char* _ev = grpc_event_string(event); \ 373 gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \ 374 gpr_free(_ev); \ 375 } 376 377 static void on_pollset_shutdown_done(void* cq, grpc_error* error); 378 379 void grpc_cq_global_init() { 380 gpr_tls_init(&g_cached_event); 381 gpr_tls_init(&g_cached_cq); 382 } 383 384 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) { 385 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) { 386 gpr_tls_set(&g_cached_event, (intptr_t)0); 387 gpr_tls_set(&g_cached_cq, (intptr_t)cq); 388 } 389 } 390 391 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq, 392 void** tag, int* ok) { 393 grpc_cq_completion* storage = 394 (grpc_cq_completion*)gpr_tls_get(&g_cached_event); 395 int ret = 0; 396 if (storage != nullptr && 397 (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) { 398 *tag = storage->tag; 399 grpc_core::ExecCtx exec_ctx; 400 *ok = (storage->next & static_cast<uintptr_t>(1)) == 1; 401 storage->done(storage->done_arg, storage); 402 ret = 1; 403 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 404 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 405 GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); 406 gpr_mu_lock(cq->mu); 407 cq_finish_shutdown_next(cq); 408 gpr_mu_unlock(cq->mu); 409 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); 410 } 411 } 412 gpr_tls_set(&g_cached_event, (intptr_t)0); 413 gpr_tls_set(&g_cached_cq, (intptr_t)0); 414 415 return ret; 416 } 417 418 static void cq_event_queue_init(grpc_cq_event_queue* q) { 419 gpr_mpscq_init(&q->queue); 420 q->queue_lock = GPR_SPINLOCK_INITIALIZER; 421 gpr_atm_no_barrier_store(&q->num_queue_items, 0); 422 } 423 424 static void cq_event_queue_destroy(grpc_cq_event_queue* q) { 425 gpr_mpscq_destroy(&q->queue); 426 } 427 428 static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) { 429 gpr_mpscq_push(&q->queue, reinterpret_cast<gpr_mpscq_node*>(c)); 430 return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; 431 } 432 433 static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) { 434 grpc_cq_completion* c = nullptr; 435 436 if (gpr_spinlock_trylock(&q->queue_lock)) { 437 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(); 438 439 bool is_empty = false; 440 c = reinterpret_cast<grpc_cq_completion*>( 441 gpr_mpscq_pop_and_check_end(&q->queue, &is_empty)); 442 gpr_spinlock_unlock(&q->queue_lock); 443 444 if (c == nullptr && !is_empty) { 445 GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(); 446 } 447 } else { 448 GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(); 449 } 450 451 if (c) { 452 gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); 453 } 454 455 return c; 456 } 457 458 /* Note: The counter is not incremented/decremented atomically with push/pop. 459 * The count is only eventually consistent */ 460 static long cq_event_queue_num_items(grpc_cq_event_queue* q) { 461 return static_cast<long>(gpr_atm_no_barrier_load(&q->num_queue_items)); 462 } 463 464 grpc_completion_queue* grpc_completion_queue_create_internal( 465 grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, 466 grpc_experimental_completion_queue_functor* shutdown_callback) { 467 GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); 468 469 grpc_completion_queue* cq; 470 471 GRPC_API_TRACE( 472 "grpc_completion_queue_create_internal(completion_type=%d, " 473 "polling_type=%d)", 474 2, (completion_type, polling_type)); 475 476 const cq_vtable* vtable = &g_cq_vtable[completion_type]; 477 const cq_poller_vtable* poller_vtable = 478 &g_poller_vtable_by_poller_type[polling_type]; 479 480 grpc_core::ExecCtx exec_ctx; 481 GRPC_STATS_INC_CQS_CREATED(); 482 483 cq = static_cast<grpc_completion_queue*>( 484 gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + 485 poller_vtable->size())); 486 487 cq->vtable = vtable; 488 cq->poller_vtable = poller_vtable; 489 490 /* One for destroy(), one for pollset_shutdown */ 491 gpr_ref_init(&cq->owning_refs, 2); 492 493 poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); 494 vtable->init(DATA_FROM_CQ(cq), shutdown_callback); 495 496 GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, 497 grpc_schedule_on_exec_ctx); 498 return cq; 499 } 500 501 static void cq_init_next( 502 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { 503 cq_next_data* cqd = static_cast<cq_next_data*>(data); 504 /* Initial count is dropped by grpc_completion_queue_shutdown */ 505 gpr_atm_no_barrier_store(&cqd->pending_events, 1); 506 cqd->shutdown_called = false; 507 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); 508 cq_event_queue_init(&cqd->queue); 509 } 510 511 static void cq_destroy_next(void* data) { 512 cq_next_data* cqd = static_cast<cq_next_data*>(data); 513 GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); 514 cq_event_queue_destroy(&cqd->queue); 515 } 516 517 static void cq_init_pluck( 518 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { 519 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data); 520 /* Initial count is dropped by grpc_completion_queue_shutdown */ 521 gpr_atm_no_barrier_store(&cqd->pending_events, 1); 522 cqd->completed_tail = &cqd->completed_head; 523 cqd->completed_head.next = (uintptr_t)cqd->completed_tail; 524 gpr_atm_no_barrier_store(&cqd->shutdown, 0); 525 cqd->shutdown_called = false; 526 cqd->num_pluckers = 0; 527 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); 528 } 529 530 static void cq_destroy_pluck(void* data) { 531 cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data); 532 GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); 533 } 534 535 static void cq_init_callback( 536 void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { 537 cq_callback_data* cqd = static_cast<cq_callback_data*>(data); 538 /* Initial count is dropped by grpc_completion_queue_shutdown */ 539 gpr_atm_no_barrier_store(&cqd->pending_events, 1); 540 cqd->shutdown_called = false; 541 gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); 542 cqd->shutdown_callback = shutdown_callback; 543 } 544 545 static void cq_destroy_callback(void* data) {} 546 547 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) { 548 return cq->vtable->cq_completion_type; 549 } 550 551 int grpc_get_cq_poll_num(grpc_completion_queue* cq) { 552 int cur_num_polls; 553 gpr_mu_lock(cq->mu); 554 cur_num_polls = cq->num_polls; 555 gpr_mu_unlock(cq->mu); 556 return cur_num_polls; 557 } 558 559 #ifndef NDEBUG 560 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason, 561 const char* file, int line) { 562 if (grpc_trace_cq_refcount.enabled()) { 563 gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); 564 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, 565 "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, 566 reason); 567 } 568 #else 569 void grpc_cq_internal_ref(grpc_completion_queue* cq) { 570 #endif 571 gpr_ref(&cq->owning_refs); 572 } 573 574 static void on_pollset_shutdown_done(void* arg, grpc_error* error) { 575 grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg); 576 GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy"); 577 } 578 579 #ifndef NDEBUG 580 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason, 581 const char* file, int line) { 582 if (grpc_trace_cq_refcount.enabled()) { 583 gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); 584 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, 585 "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, 586 reason); 587 } 588 #else 589 void grpc_cq_internal_unref(grpc_completion_queue* cq) { 590 #endif 591 if (gpr_unref(&cq->owning_refs)) { 592 cq->vtable->destroy(DATA_FROM_CQ(cq)); 593 cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq)); 594 #ifndef NDEBUG 595 gpr_free(cq->outstanding_tags); 596 #endif 597 gpr_free(cq); 598 } 599 } 600 601 #ifndef NDEBUG 602 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) { 603 int found = 0; 604 if (lock_cq) { 605 gpr_mu_lock(cq->mu); 606 } 607 608 for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) { 609 if (cq->outstanding_tags[i] == tag) { 610 cq->outstanding_tag_count--; 611 GPR_SWAP(void*, cq->outstanding_tags[i], 612 cq->outstanding_tags[cq->outstanding_tag_count]); 613 found = 1; 614 break; 615 } 616 } 617 618 if (lock_cq) { 619 gpr_mu_unlock(cq->mu); 620 } 621 622 GPR_ASSERT(found); 623 } 624 #else 625 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {} 626 #endif 627 628 /* Atomically increments a counter only if the counter is not zero. Returns 629 * true if the increment was successful; false if the counter is zero */ 630 static bool atm_inc_if_nonzero(gpr_atm* counter) { 631 while (true) { 632 gpr_atm count = gpr_atm_acq_load(counter); 633 /* If zero, we are done. If not, we must to a CAS (instead of an atomic 634 * increment) to maintain the contract: do not increment the counter if it 635 * is zero. */ 636 if (count == 0) { 637 return false; 638 } else if (gpr_atm_full_cas(counter, count, count + 1)) { 639 break; 640 } 641 } 642 643 return true; 644 } 645 646 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) { 647 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 648 return atm_inc_if_nonzero(&cqd->pending_events); 649 } 650 651 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) { 652 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 653 return atm_inc_if_nonzero(&cqd->pending_events); 654 } 655 656 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) { 657 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); 658 return atm_inc_if_nonzero(&cqd->pending_events); 659 } 660 661 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { 662 #ifndef NDEBUG 663 gpr_mu_lock(cq->mu); 664 if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { 665 cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); 666 cq->outstanding_tags = static_cast<void**>(gpr_realloc( 667 cq->outstanding_tags, 668 sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity)); 669 } 670 cq->outstanding_tags[cq->outstanding_tag_count++] = tag; 671 gpr_mu_unlock(cq->mu); 672 #endif 673 return cq->vtable->begin_op(cq, tag); 674 } 675 676 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a 677 * completion 678 * type of GRPC_CQ_NEXT) */ 679 static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, 680 grpc_error* error, 681 void (*done)(void* done_arg, 682 grpc_cq_completion* storage), 683 void* done_arg, grpc_cq_completion* storage) { 684 GPR_TIMER_SCOPE("cq_end_op_for_next", 0); 685 686 if (grpc_api_trace.enabled() || 687 (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { 688 const char* errmsg = grpc_error_string(error); 689 GRPC_API_TRACE( 690 "cq_end_op_for_next(cq=%p, tag=%p, error=%s, " 691 "done=%p, done_arg=%p, storage=%p)", 692 6, (cq, tag, errmsg, done, done_arg, storage)); 693 if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { 694 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); 695 } 696 } 697 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 698 int is_success = (error == GRPC_ERROR_NONE); 699 700 storage->tag = tag; 701 storage->done = done; 702 storage->done_arg = done_arg; 703 storage->next = static_cast<uintptr_t>(is_success); 704 705 cq_check_tag(cq, tag, true); /* Used in debug builds only */ 706 707 if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq && 708 (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) { 709 gpr_tls_set(&g_cached_event, (intptr_t)storage); 710 } else { 711 /* Add the completion to the queue */ 712 bool is_first = cq_event_queue_push(&cqd->queue, storage); 713 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); 714 715 /* Since we do not hold the cq lock here, it is important to do an 'acquire' 716 load here (instead of a 'no_barrier' load) to match with the release 717 store 718 (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next 719 */ 720 bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; 721 722 if (!will_definitely_shutdown) { 723 /* Only kick if this is the first item queued */ 724 if (is_first) { 725 gpr_mu_lock(cq->mu); 726 grpc_error* kick_error = 727 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); 728 gpr_mu_unlock(cq->mu); 729 730 if (kick_error != GRPC_ERROR_NONE) { 731 const char* msg = grpc_error_string(kick_error); 732 gpr_log(GPR_ERROR, "Kick failed: %s", msg); 733 GRPC_ERROR_UNREF(kick_error); 734 } 735 } 736 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 737 GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); 738 gpr_mu_lock(cq->mu); 739 cq_finish_shutdown_next(cq); 740 gpr_mu_unlock(cq->mu); 741 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); 742 } 743 } else { 744 GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); 745 gpr_atm_rel_store(&cqd->pending_events, 0); 746 gpr_mu_lock(cq->mu); 747 cq_finish_shutdown_next(cq); 748 gpr_mu_unlock(cq->mu); 749 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); 750 } 751 } 752 753 GRPC_ERROR_UNREF(error); 754 } 755 756 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a 757 * completion 758 * type of GRPC_CQ_PLUCK) */ 759 static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, 760 grpc_error* error, 761 void (*done)(void* done_arg, 762 grpc_cq_completion* storage), 763 void* done_arg, grpc_cq_completion* storage) { 764 GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); 765 766 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 767 int is_success = (error == GRPC_ERROR_NONE); 768 769 if (grpc_api_trace.enabled() || 770 (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { 771 const char* errmsg = grpc_error_string(error); 772 GRPC_API_TRACE( 773 "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, " 774 "done=%p, done_arg=%p, storage=%p)", 775 6, (cq, tag, errmsg, done, done_arg, storage)); 776 if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { 777 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); 778 } 779 } 780 781 storage->tag = tag; 782 storage->done = done; 783 storage->done_arg = done_arg; 784 storage->next = 785 ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success)); 786 787 gpr_mu_lock(cq->mu); 788 cq_check_tag(cq, tag, false); /* Used in debug builds only */ 789 790 /* Add to the list of completions */ 791 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); 792 cqd->completed_tail->next = 793 ((uintptr_t)storage) | (1u & cqd->completed_tail->next); 794 cqd->completed_tail = storage; 795 796 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 797 cq_finish_shutdown_pluck(cq); 798 gpr_mu_unlock(cq->mu); 799 } else { 800 grpc_pollset_worker* pluck_worker = nullptr; 801 for (int i = 0; i < cqd->num_pluckers; i++) { 802 if (cqd->pluckers[i].tag == tag) { 803 pluck_worker = *cqd->pluckers[i].worker; 804 break; 805 } 806 } 807 808 grpc_error* kick_error = 809 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); 810 811 gpr_mu_unlock(cq->mu); 812 813 if (kick_error != GRPC_ERROR_NONE) { 814 const char* msg = grpc_error_string(kick_error); 815 gpr_log(GPR_ERROR, "Kick failed: %s", msg); 816 817 GRPC_ERROR_UNREF(kick_error); 818 } 819 } 820 821 GRPC_ERROR_UNREF(error); 822 } 823 824 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ 825 static void cq_end_op_for_callback( 826 grpc_completion_queue* cq, void* tag, grpc_error* error, 827 void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, 828 grpc_cq_completion* storage) { 829 GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); 830 831 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); 832 bool is_success = (error == GRPC_ERROR_NONE); 833 834 if (grpc_api_trace.enabled() || 835 (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { 836 const char* errmsg = grpc_error_string(error); 837 GRPC_API_TRACE( 838 "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, " 839 "done=%p, done_arg=%p, storage=%p)", 840 6, (cq, tag, errmsg, done, done_arg, storage)); 841 if (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE) { 842 gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); 843 } 844 } 845 846 // The callback-based CQ isn't really a queue at all and thus has no need 847 // for reserved storage. Invoke the done callback right away to release it. 848 done(done_arg, storage); 849 850 gpr_mu_lock(cq->mu); 851 cq_check_tag(cq, tag, false); /* Used in debug builds only */ 852 853 gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); 854 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 855 cq_finish_shutdown_callback(cq); 856 gpr_mu_unlock(cq->mu); 857 } else { 858 gpr_mu_unlock(cq->mu); 859 } 860 861 GRPC_ERROR_UNREF(error); 862 863 auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag); 864 (*functor->functor_run)(functor, is_success); 865 } 866 867 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, 868 void (*done)(void* done_arg, grpc_cq_completion* storage), 869 void* done_arg, grpc_cq_completion* storage) { 870 cq->vtable->end_op(cq, tag, error, done, done_arg, storage); 871 } 872 873 typedef struct { 874 gpr_atm last_seen_things_queued_ever; 875 grpc_completion_queue* cq; 876 grpc_millis deadline; 877 grpc_cq_completion* stolen_completion; 878 void* tag; /* for pluck */ 879 bool first_loop; 880 } cq_is_finished_arg; 881 882 class ExecCtxNext : public grpc_core::ExecCtx { 883 public: 884 ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} 885 886 bool CheckReadyToFinish() override { 887 cq_is_finished_arg* a = 888 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_); 889 grpc_completion_queue* cq = a->cq; 890 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 891 GPR_ASSERT(a->stolen_completion == nullptr); 892 893 gpr_atm current_last_seen_things_queued_ever = 894 gpr_atm_no_barrier_load(&cqd->things_queued_ever); 895 896 if (current_last_seen_things_queued_ever != 897 a->last_seen_things_queued_ever) { 898 a->last_seen_things_queued_ever = 899 gpr_atm_no_barrier_load(&cqd->things_queued_ever); 900 901 /* Pop a cq_completion from the queue. Returns NULL if the queue is empty 902 * might return NULL in some cases even if the queue is not empty; but 903 * that 904 * is ok and doesn't affect correctness. Might effect the tail latencies a 905 * bit) */ 906 a->stolen_completion = cq_event_queue_pop(&cqd->queue); 907 if (a->stolen_completion != nullptr) { 908 return true; 909 } 910 } 911 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); 912 } 913 914 private: 915 void* check_ready_to_finish_arg_; 916 }; 917 918 #ifndef NDEBUG 919 static void dump_pending_tags(grpc_completion_queue* cq) { 920 if (!grpc_trace_pending_tags.enabled()) return; 921 922 gpr_strvec v; 923 gpr_strvec_init(&v); 924 gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); 925 gpr_mu_lock(cq->mu); 926 for (size_t i = 0; i < cq->outstanding_tag_count; i++) { 927 char* s; 928 gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); 929 gpr_strvec_add(&v, s); 930 } 931 gpr_mu_unlock(cq->mu); 932 char* out = gpr_strvec_flatten(&v, nullptr); 933 gpr_strvec_destroy(&v); 934 gpr_log(GPR_DEBUG, "%s", out); 935 gpr_free(out); 936 } 937 #else 938 static void dump_pending_tags(grpc_completion_queue* cq) {} 939 #endif 940 941 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, 942 void* reserved) { 943 GPR_TIMER_SCOPE("grpc_completion_queue_next", 0); 944 945 grpc_event ret; 946 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 947 948 GRPC_API_TRACE( 949 "grpc_completion_queue_next(" 950 "cq=%p, " 951 "deadline=gpr_timespec { tv_sec: %" PRId64 952 ", tv_nsec: %d, clock_type: %d }, " 953 "reserved=%p)", 954 5, 955 (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, 956 reserved)); 957 GPR_ASSERT(!reserved); 958 959 dump_pending_tags(cq); 960 961 GRPC_CQ_INTERNAL_REF(cq, "next"); 962 963 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); 964 cq_is_finished_arg is_finished_arg = { 965 gpr_atm_no_barrier_load(&cqd->things_queued_ever), 966 cq, 967 deadline_millis, 968 nullptr, 969 nullptr, 970 true}; 971 ExecCtxNext exec_ctx(&is_finished_arg); 972 for (;;) { 973 grpc_millis iteration_deadline = deadline_millis; 974 975 if (is_finished_arg.stolen_completion != nullptr) { 976 grpc_cq_completion* c = is_finished_arg.stolen_completion; 977 is_finished_arg.stolen_completion = nullptr; 978 ret.type = GRPC_OP_COMPLETE; 979 ret.success = c->next & 1u; 980 ret.tag = c->tag; 981 c->done(c->done_arg, c); 982 break; 983 } 984 985 grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue); 986 987 if (c != nullptr) { 988 ret.type = GRPC_OP_COMPLETE; 989 ret.success = c->next & 1u; 990 ret.tag = c->tag; 991 c->done(c->done_arg, c); 992 break; 993 } else { 994 /* If c == NULL it means either the queue is empty OR in an transient 995 inconsistent state. If it is the latter, we shold do a 0-timeout poll 996 so that the thread comes back quickly from poll to make a second 997 attempt at popping. Not doing this can potentially deadlock this 998 thread forever (if the deadline is infinity) */ 999 if (cq_event_queue_num_items(&cqd->queue) > 0) { 1000 iteration_deadline = 0; 1001 } 1002 } 1003 1004 if (gpr_atm_acq_load(&cqd->pending_events) == 0) { 1005 /* Before returning, check if the queue has any items left over (since 1006 gpr_mpscq_pop() can sometimes return NULL even if the queue is not 1007 empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ 1008 if (cq_event_queue_num_items(&cqd->queue) > 0) { 1009 /* Go to the beginning of the loop. No point doing a poll because 1010 (cq->shutdown == true) is only possible when there is no pending 1011 work (i.e cq->pending_events == 0) and any outstanding completion 1012 events should have already been queued on this cq */ 1013 continue; 1014 } 1015 1016 memset(&ret, 0, sizeof(ret)); 1017 ret.type = GRPC_QUEUE_SHUTDOWN; 1018 break; 1019 } 1020 1021 if (!is_finished_arg.first_loop && 1022 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { 1023 memset(&ret, 0, sizeof(ret)); 1024 ret.type = GRPC_QUEUE_TIMEOUT; 1025 dump_pending_tags(cq); 1026 break; 1027 } 1028 1029 /* The main polling work happens in grpc_pollset_work */ 1030 gpr_mu_lock(cq->mu); 1031 cq->num_polls++; 1032 grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr, 1033 iteration_deadline); 1034 gpr_mu_unlock(cq->mu); 1035 1036 if (err != GRPC_ERROR_NONE) { 1037 const char* msg = grpc_error_string(err); 1038 gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); 1039 1040 GRPC_ERROR_UNREF(err); 1041 memset(&ret, 0, sizeof(ret)); 1042 ret.type = GRPC_QUEUE_TIMEOUT; 1043 dump_pending_tags(cq); 1044 break; 1045 } 1046 is_finished_arg.first_loop = false; 1047 } 1048 1049 if (cq_event_queue_num_items(&cqd->queue) > 0 && 1050 gpr_atm_acq_load(&cqd->pending_events) > 0) { 1051 gpr_mu_lock(cq->mu); 1052 cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr); 1053 gpr_mu_unlock(cq->mu); 1054 } 1055 1056 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); 1057 GRPC_CQ_INTERNAL_UNREF(cq, "next"); 1058 1059 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr); 1060 1061 return ret; 1062 } 1063 1064 /* Finishes the completion queue shutdown. This means that there are no more 1065 completion events / tags expected from the completion queue 1066 - Must be called under completion queue lock 1067 - Must be called only once in completion queue's lifetime 1068 - grpc_completion_queue_shutdown() MUST have been called before calling 1069 this function */ 1070 static void cq_finish_shutdown_next(grpc_completion_queue* cq) { 1071 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 1072 1073 GPR_ASSERT(cqd->shutdown_called); 1074 GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); 1075 1076 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); 1077 } 1078 1079 static void cq_shutdown_next(grpc_completion_queue* cq) { 1080 cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq); 1081 1082 /* Need an extra ref for cq here because: 1083 * We call cq_finish_shutdown_next() below, that would call pollset shutdown. 1084 * Pollset shutdown decrements the cq ref count which can potentially destroy 1085 * the cq (if that happens to be the last ref). 1086 * Creating an extra ref here prevents the cq from getting destroyed while 1087 * this function is still active */ 1088 GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); 1089 gpr_mu_lock(cq->mu); 1090 if (cqd->shutdown_called) { 1091 gpr_mu_unlock(cq->mu); 1092 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); 1093 return; 1094 } 1095 cqd->shutdown_called = true; 1096 /* Doing a full_fetch_add (i.e acq/release) here to match with 1097 * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write 1098 * on this counter without necessarily holding a lock on cq */ 1099 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 1100 cq_finish_shutdown_next(cq); 1101 } 1102 gpr_mu_unlock(cq->mu); 1103 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down"); 1104 } 1105 1106 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq, 1107 gpr_timespec deadline, void* reserved) { 1108 return cq->vtable->next(cq, deadline, reserved); 1109 } 1110 1111 static int add_plucker(grpc_completion_queue* cq, void* tag, 1112 grpc_pollset_worker** worker) { 1113 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 1114 if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { 1115 return 0; 1116 } 1117 cqd->pluckers[cqd->num_pluckers].tag = tag; 1118 cqd->pluckers[cqd->num_pluckers].worker = worker; 1119 cqd->num_pluckers++; 1120 return 1; 1121 } 1122 1123 static void del_plucker(grpc_completion_queue* cq, void* tag, 1124 grpc_pollset_worker** worker) { 1125 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 1126 for (int i = 0; i < cqd->num_pluckers; i++) { 1127 if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { 1128 cqd->num_pluckers--; 1129 GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]); 1130 return; 1131 } 1132 } 1133 GPR_UNREACHABLE_CODE(return ); 1134 } 1135 1136 class ExecCtxPluck : public grpc_core::ExecCtx { 1137 public: 1138 ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {} 1139 1140 bool CheckReadyToFinish() override { 1141 cq_is_finished_arg* a = 1142 static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_); 1143 grpc_completion_queue* cq = a->cq; 1144 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 1145 1146 GPR_ASSERT(a->stolen_completion == nullptr); 1147 gpr_atm current_last_seen_things_queued_ever = 1148 gpr_atm_no_barrier_load(&cqd->things_queued_ever); 1149 if (current_last_seen_things_queued_ever != 1150 a->last_seen_things_queued_ever) { 1151 gpr_mu_lock(cq->mu); 1152 a->last_seen_things_queued_ever = 1153 gpr_atm_no_barrier_load(&cqd->things_queued_ever); 1154 grpc_cq_completion* c; 1155 grpc_cq_completion* prev = &cqd->completed_head; 1156 while ((c = (grpc_cq_completion*)(prev->next & 1157 ~static_cast<uintptr_t>(1))) != 1158 &cqd->completed_head) { 1159 if (c->tag == a->tag) { 1160 prev->next = (prev->next & static_cast<uintptr_t>(1)) | 1161 (c->next & ~static_cast<uintptr_t>(1)); 1162 if (c == cqd->completed_tail) { 1163 cqd->completed_tail = prev; 1164 } 1165 gpr_mu_unlock(cq->mu); 1166 a->stolen_completion = c; 1167 return true; 1168 } 1169 prev = c; 1170 } 1171 gpr_mu_unlock(cq->mu); 1172 } 1173 return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now(); 1174 } 1175 1176 private: 1177 void* check_ready_to_finish_arg_; 1178 }; 1179 1180 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, 1181 gpr_timespec deadline, void* reserved) { 1182 GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0); 1183 1184 grpc_event ret; 1185 grpc_cq_completion* c; 1186 grpc_cq_completion* prev; 1187 grpc_pollset_worker* worker = nullptr; 1188 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 1189 1190 if (grpc_cq_pluck_trace.enabled()) { 1191 GRPC_API_TRACE( 1192 "grpc_completion_queue_pluck(" 1193 "cq=%p, tag=%p, " 1194 "deadline=gpr_timespec { tv_sec: %" PRId64 1195 ", tv_nsec: %d, clock_type: %d }, " 1196 "reserved=%p)", 1197 6, 1198 (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, 1199 reserved)); 1200 } 1201 GPR_ASSERT(!reserved); 1202 1203 dump_pending_tags(cq); 1204 1205 GRPC_CQ_INTERNAL_REF(cq, "pluck"); 1206 gpr_mu_lock(cq->mu); 1207 grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline); 1208 cq_is_finished_arg is_finished_arg = { 1209 gpr_atm_no_barrier_load(&cqd->things_queued_ever), 1210 cq, 1211 deadline_millis, 1212 nullptr, 1213 tag, 1214 true}; 1215 ExecCtxPluck exec_ctx(&is_finished_arg); 1216 for (;;) { 1217 if (is_finished_arg.stolen_completion != nullptr) { 1218 gpr_mu_unlock(cq->mu); 1219 c = is_finished_arg.stolen_completion; 1220 is_finished_arg.stolen_completion = nullptr; 1221 ret.type = GRPC_OP_COMPLETE; 1222 ret.success = c->next & 1u; 1223 ret.tag = c->tag; 1224 c->done(c->done_arg, c); 1225 break; 1226 } 1227 prev = &cqd->completed_head; 1228 while ( 1229 (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) != 1230 &cqd->completed_head) { 1231 if (c->tag == tag) { 1232 prev->next = (prev->next & static_cast<uintptr_t>(1)) | 1233 (c->next & ~static_cast<uintptr_t>(1)); 1234 if (c == cqd->completed_tail) { 1235 cqd->completed_tail = prev; 1236 } 1237 gpr_mu_unlock(cq->mu); 1238 ret.type = GRPC_OP_COMPLETE; 1239 ret.success = c->next & 1u; 1240 ret.tag = c->tag; 1241 c->done(c->done_arg, c); 1242 goto done; 1243 } 1244 prev = c; 1245 } 1246 if (gpr_atm_no_barrier_load(&cqd->shutdown)) { 1247 gpr_mu_unlock(cq->mu); 1248 memset(&ret, 0, sizeof(ret)); 1249 ret.type = GRPC_QUEUE_SHUTDOWN; 1250 break; 1251 } 1252 if (!add_plucker(cq, tag, &worker)) { 1253 gpr_log(GPR_DEBUG, 1254 "Too many outstanding grpc_completion_queue_pluck calls: maximum " 1255 "is %d", 1256 GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); 1257 gpr_mu_unlock(cq->mu); 1258 memset(&ret, 0, sizeof(ret)); 1259 /* TODO(ctiller): should we use a different result here */ 1260 ret.type = GRPC_QUEUE_TIMEOUT; 1261 dump_pending_tags(cq); 1262 break; 1263 } 1264 if (!is_finished_arg.first_loop && 1265 grpc_core::ExecCtx::Get()->Now() >= deadline_millis) { 1266 del_plucker(cq, tag, &worker); 1267 gpr_mu_unlock(cq->mu); 1268 memset(&ret, 0, sizeof(ret)); 1269 ret.type = GRPC_QUEUE_TIMEOUT; 1270 dump_pending_tags(cq); 1271 break; 1272 } 1273 cq->num_polls++; 1274 grpc_error* err = 1275 cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis); 1276 if (err != GRPC_ERROR_NONE) { 1277 del_plucker(cq, tag, &worker); 1278 gpr_mu_unlock(cq->mu); 1279 const char* msg = grpc_error_string(err); 1280 gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); 1281 1282 GRPC_ERROR_UNREF(err); 1283 memset(&ret, 0, sizeof(ret)); 1284 ret.type = GRPC_QUEUE_TIMEOUT; 1285 dump_pending_tags(cq); 1286 break; 1287 } 1288 is_finished_arg.first_loop = false; 1289 del_plucker(cq, tag, &worker); 1290 } 1291 done: 1292 GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); 1293 GRPC_CQ_INTERNAL_UNREF(cq, "pluck"); 1294 1295 GPR_ASSERT(is_finished_arg.stolen_completion == nullptr); 1296 1297 return ret; 1298 } 1299 1300 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, 1301 gpr_timespec deadline, void* reserved) { 1302 return cq->vtable->pluck(cq, tag, deadline, reserved); 1303 } 1304 1305 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) { 1306 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 1307 1308 GPR_ASSERT(cqd->shutdown_called); 1309 GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); 1310 gpr_atm_no_barrier_store(&cqd->shutdown, 1); 1311 1312 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); 1313 } 1314 1315 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but 1316 * merging them is a bit tricky and probably not worth it */ 1317 static void cq_shutdown_pluck(grpc_completion_queue* cq) { 1318 cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq); 1319 1320 /* Need an extra ref for cq here because: 1321 * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. 1322 * Pollset shutdown decrements the cq ref count which can potentially destroy 1323 * the cq (if that happens to be the last ref). 1324 * Creating an extra ref here prevents the cq from getting destroyed while 1325 * this function is still active */ 1326 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)"); 1327 gpr_mu_lock(cq->mu); 1328 if (cqd->shutdown_called) { 1329 gpr_mu_unlock(cq->mu); 1330 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); 1331 return; 1332 } 1333 cqd->shutdown_called = true; 1334 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 1335 cq_finish_shutdown_pluck(cq); 1336 } 1337 gpr_mu_unlock(cq->mu); 1338 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)"); 1339 } 1340 1341 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { 1342 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); 1343 auto* callback = cqd->shutdown_callback; 1344 1345 GPR_ASSERT(cqd->shutdown_called); 1346 1347 cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); 1348 (*callback->functor_run)(callback, true); 1349 } 1350 1351 static void cq_shutdown_callback(grpc_completion_queue* cq) { 1352 cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); 1353 1354 /* Need an extra ref for cq here because: 1355 * We call cq_finish_shutdown_callback() below, which calls pollset shutdown. 1356 * Pollset shutdown decrements the cq ref count which can potentially destroy 1357 * the cq (if that happens to be the last ref). 1358 * Creating an extra ref here prevents the cq from getting destroyed while 1359 * this function is still active */ 1360 GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)"); 1361 gpr_mu_lock(cq->mu); 1362 if (cqd->shutdown_called) { 1363 gpr_mu_unlock(cq->mu); 1364 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); 1365 return; 1366 } 1367 cqd->shutdown_called = true; 1368 if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { 1369 gpr_mu_unlock(cq->mu); 1370 cq_finish_shutdown_callback(cq); 1371 } else { 1372 gpr_mu_unlock(cq->mu); 1373 } 1374 GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)"); 1375 } 1376 1377 /* Shutdown simply drops a ref that we reserved at creation time; if we drop 1378 to zero here, then enter shutdown mode and wake up any waiters */ 1379 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) { 1380 GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0); 1381 grpc_core::ExecCtx exec_ctx; 1382 GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); 1383 cq->vtable->shutdown(cq); 1384 } 1385 1386 void grpc_completion_queue_destroy(grpc_completion_queue* cq) { 1387 GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0); 1388 GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq)); 1389 grpc_completion_queue_shutdown(cq); 1390 1391 grpc_core::ExecCtx exec_ctx; 1392 GRPC_CQ_INTERNAL_UNREF(cq, "destroy"); 1393 } 1394 1395 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) { 1396 return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr; 1397 } 1398 1399 bool grpc_cq_can_listen(grpc_completion_queue* cq) { 1400 return cq->poller_vtable->can_listen; 1401 } 1402