Home | History | Annotate | Download | only in common
      1 /*
      2  * Copyright 2015 gRPC authors.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  *
     16  */
     17 
     18 #include <grpcpp/completion_queue.h>
     19 
     20 #include <memory>
     21 
     22 #include <grpc/grpc.h>
     23 #include <grpc/support/log.h>
     24 #include <grpcpp/impl/grpc_library.h>
     25 #include <grpcpp/support/time.h>
     26 
     27 namespace grpc {
     28 
     29 static internal::GrpcLibraryInitializer g_gli_initializer;
     30 
     31 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
     32 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
     33 // a 'grpc_completion_queue' instance (which is being passed as the input to
     34 // this constructor), one must have already called grpc_init().
     35 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
     36     : GrpcLibraryCodegen(false), cq_(take) {
     37   InitialAvalanching();
     38 }
     39 
     40 void CompletionQueue::Shutdown() {
     41   g_gli_initializer.summon();
     42   CompleteAvalanching();
     43 }
     44 
     45 void CompletionQueue::CompleteAvalanching() {
     46   // Check if this was the last avalanching operation
     47   if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
     48                                    static_cast<gpr_atm>(-1)) == 1) {
     49     grpc_completion_queue_shutdown(cq_);
     50   }
     51 }
     52 
     53 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
     54     void** tag, bool* ok, gpr_timespec deadline) {
     55   for (;;) {
     56     auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
     57     switch (ev.type) {
     58       case GRPC_QUEUE_TIMEOUT:
     59         return TIMEOUT;
     60       case GRPC_QUEUE_SHUTDOWN:
     61         return SHUTDOWN;
     62       case GRPC_OP_COMPLETE:
     63         auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
     64         *ok = ev.success != 0;
     65         *tag = cq_tag;
     66         if (cq_tag->FinalizeResult(tag, ok)) {
     67           return GOT_EVENT;
     68         }
     69         break;
     70     }
     71   }
     72 }
     73 
     74 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
     75     CompletionQueue* cq)
     76     : cq_(cq), flushed_(false) {
     77   grpc_completion_queue_thread_local_cache_init(cq_->cq_);
     78 }
     79 
     80 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
     81   GPR_ASSERT(flushed_);
     82 }
     83 
     84 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
     85   int res = 0;
     86   void* res_tag;
     87   flushed_ = true;
     88   if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
     89                                                      &res)) {
     90     auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
     91     *ok = res == 1;
     92     if (cq_tag->FinalizeResult(tag, ok)) {
     93       return true;
     94     }
     95   }
     96   return false;
     97 }
     98 
     99 }  // namespace grpc
    100