Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/iomgr/call_combiner.h"
     22 
     23 #include <inttypes.h>
     24 
     25 #include <grpc/support/log.h>
     26 #include "src/core/lib/debug/stats.h"
     27 #include "src/core/lib/profiling/timers.h"
     28 
     29 grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
     30 
     31 static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
     32   if (cancel_state & 1) {
     33     return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
     34   }
     35   return GRPC_ERROR_NONE;
     36 }
     37 
     38 static gpr_atm encode_cancel_state_error(grpc_error* error) {
     39   return static_cast<gpr_atm>(1) | (gpr_atm)error;
     40 }
     41 
     42 void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
     43   gpr_mpscq_init(&call_combiner->queue);
     44 }
     45 
     46 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
     47   gpr_mpscq_destroy(&call_combiner->queue);
     48   GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
     49 }
     50 
     51 #ifndef NDEBUG
     52 #define DEBUG_ARGS , const char *file, int line
     53 #define DEBUG_FMT_STR "%s:%d: "
     54 #define DEBUG_FMT_ARGS , file, line
     55 #else
     56 #define DEBUG_ARGS
     57 #define DEBUG_FMT_STR
     58 #define DEBUG_FMT_ARGS
     59 #endif
     60 
     61 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
     62                               grpc_closure* closure,
     63                               grpc_error* error DEBUG_ARGS,
     64                               const char* reason) {
     65   GPR_TIMER_SCOPE("call_combiner_start", 0);
     66   if (grpc_call_combiner_trace.enabled()) {
     67     gpr_log(GPR_INFO,
     68             "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
     69             "%s] error=%s",
     70             call_combiner, closure DEBUG_FMT_ARGS, reason,
     71             grpc_error_string(error));
     72   }
     73   size_t prev_size = static_cast<size_t>(
     74       gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1));
     75   if (grpc_call_combiner_trace.enabled()) {
     76     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
     77             prev_size + 1);
     78   }
     79   GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
     80   if (prev_size == 0) {
     81     GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
     82 
     83     GPR_TIMER_MARK("call_combiner_initiate", 0);
     84     if (grpc_call_combiner_trace.enabled()) {
     85       gpr_log(GPR_INFO, "  EXECUTING IMMEDIATELY");
     86     }
     87     // Queue was empty, so execute this closure immediately.
     88     GRPC_CLOSURE_SCHED(closure, error);
     89   } else {
     90     if (grpc_call_combiner_trace.enabled()) {
     91       gpr_log(GPR_INFO, "  QUEUING");
     92     }
     93     // Queue was not empty, so add closure to queue.
     94     closure->error_data.error = error;
     95     gpr_mpscq_push(&call_combiner->queue,
     96                    reinterpret_cast<gpr_mpscq_node*>(closure));
     97   }
     98 }
     99 
    100 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
    101                              const char* reason) {
    102   GPR_TIMER_SCOPE("call_combiner_stop", 0);
    103   if (grpc_call_combiner_trace.enabled()) {
    104     gpr_log(GPR_INFO,
    105             "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
    106             call_combiner DEBUG_FMT_ARGS, reason);
    107   }
    108   size_t prev_size = static_cast<size_t>(
    109       gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1));
    110   if (grpc_call_combiner_trace.enabled()) {
    111     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
    112             prev_size - 1);
    113   }
    114   GPR_ASSERT(prev_size >= 1);
    115   if (prev_size > 1) {
    116     while (true) {
    117       if (grpc_call_combiner_trace.enabled()) {
    118         gpr_log(GPR_INFO, "  checking queue");
    119       }
    120       bool empty;
    121       grpc_closure* closure = reinterpret_cast<grpc_closure*>(
    122           gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty));
    123       if (closure == nullptr) {
    124         // This can happen either due to a race condition within the mpscq
    125         // code or because of a race with grpc_call_combiner_start().
    126         if (grpc_call_combiner_trace.enabled()) {
    127           gpr_log(GPR_INFO, "  queue returned no result; checking again");
    128         }
    129         continue;
    130       }
    131       if (grpc_call_combiner_trace.enabled()) {
    132         gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%p error=%s",
    133                 closure, grpc_error_string(closure->error_data.error));
    134       }
    135       GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
    136       break;
    137     }
    138   } else if (grpc_call_combiner_trace.enabled()) {
    139     gpr_log(GPR_INFO, "  queue empty");
    140   }
    141 }
    142 
    143 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
    144                                              grpc_closure* closure) {
    145   GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
    146   while (true) {
    147     // Decode original state.
    148     gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
    149     grpc_error* original_error = decode_cancel_state_error(original_state);
    150     // If error is set, invoke the cancellation closure immediately.
    151     // Otherwise, store the new closure.
    152     if (original_error != GRPC_ERROR_NONE) {
    153       if (grpc_call_combiner_trace.enabled()) {
    154         gpr_log(GPR_INFO,
    155                 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
    156                 "for pre-existing cancellation",
    157                 call_combiner, closure);
    158       }
    159       GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
    160       break;
    161     } else {
    162       if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
    163                            (gpr_atm)closure)) {
    164         if (grpc_call_combiner_trace.enabled()) {
    165           gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
    166                   call_combiner, closure);
    167         }
    168         // If we replaced an earlier closure, invoke the original
    169         // closure with GRPC_ERROR_NONE.  This allows callers to clean
    170         // up any resources they may be holding for the callback.
    171         if (original_state != 0) {
    172           closure = (grpc_closure*)original_state;
    173           if (grpc_call_combiner_trace.enabled()) {
    174             gpr_log(GPR_INFO,
    175                     "call_combiner=%p: scheduling old cancel callback=%p",
    176                     call_combiner, closure);
    177           }
    178           GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
    179         }
    180         break;
    181       }
    182     }
    183     // cas failed, try again.
    184   }
    185 }
    186 
    187 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
    188                                grpc_error* error) {
    189   GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
    190   while (true) {
    191     gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
    192     grpc_error* original_error = decode_cancel_state_error(original_state);
    193     if (original_error != GRPC_ERROR_NONE) {
    194       GRPC_ERROR_UNREF(error);
    195       break;
    196     }
    197     if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
    198                          encode_cancel_state_error(error))) {
    199       if (original_state != 0) {
    200         grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
    201         if (grpc_call_combiner_trace.enabled()) {
    202           gpr_log(GPR_INFO,
    203                   "call_combiner=%p: scheduling notify_on_cancel callback=%p",
    204                   call_combiner, notify_on_cancel);
    205         }
    206         GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
    207       }
    208       break;
    209     }
    210     // cas failed, try again.
    211   }
    212 }
    213