1 /* 2 * Copyright 2015 gRPC authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 */ 17 18 #include <grpcpp/completion_queue.h> 19 20 #include <memory> 21 22 #include <grpc/grpc.h> 23 #include <grpc/support/log.h> 24 #include <grpcpp/impl/grpc_library.h> 25 #include <grpcpp/support/time.h> 26 27 namespace grpc { 28 29 static internal::GrpcLibraryInitializer g_gli_initializer; 30 31 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here 32 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create 33 // a 'grpc_completion_queue' instance (which is being passed as the input to 34 // this constructor), one must have already called grpc_init(). 35 CompletionQueue::CompletionQueue(grpc_completion_queue* take) 36 : GrpcLibraryCodegen(false), cq_(take) { 37 InitialAvalanching(); 38 } 39 40 void CompletionQueue::Shutdown() { 41 g_gli_initializer.summon(); 42 CompleteAvalanching(); 43 } 44 45 void CompletionQueue::CompleteAvalanching() { 46 // Check if this was the last avalanching operation 47 if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, 48 static_cast<gpr_atm>(-1)) == 1) { 49 grpc_completion_queue_shutdown(cq_); 50 } 51 } 52 53 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( 54 void** tag, bool* ok, gpr_timespec deadline) { 55 for (;;) { 56 auto ev = grpc_completion_queue_next(cq_, deadline, nullptr); 57 switch (ev.type) { 58 case GRPC_QUEUE_TIMEOUT: 59 return TIMEOUT; 60 case GRPC_QUEUE_SHUTDOWN: 61 return SHUTDOWN; 62 case GRPC_OP_COMPLETE: 63 auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); 64 *ok = ev.success != 0; 65 *tag = cq_tag; 66 if (cq_tag->FinalizeResult(tag, ok)) { 67 return GOT_EVENT; 68 } 69 break; 70 } 71 } 72 } 73 74 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache( 75 CompletionQueue* cq) 76 : cq_(cq), flushed_(false) { 77 grpc_completion_queue_thread_local_cache_init(cq_->cq_); 78 } 79 80 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() { 81 GPR_ASSERT(flushed_); 82 } 83 84 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { 85 int res = 0; 86 void* res_tag; 87 flushed_ = true; 88 if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, 89 &res)) { 90 auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); 91 *ok = res == 1; 92 if (cq_tag->FinalizeResult(tag, ok)) { 93 return true; 94 } 95 } 96 return false; 97 } 98 99 } // namespace grpc 100