Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
     20 #define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
     21 
     22 #include <grpc/support/port_platform.h>
     23 
     24 #include <stddef.h>
     25 
     26 #include <grpc/support/atm.h>
     27 
     28 #include "src/core/lib/gpr/mpscq.h"
     29 #include "src/core/lib/gprpp/inlined_vector.h"
     30 #include "src/core/lib/iomgr/closure.h"
     31 
     32 // A simple, lock-free mechanism for serializing activity related to a
     33 // single call.  This is similar to a combiner but is more lightweight.
     34 //
     35 // It requires the callback (or, in the common case where the callback
     36 // actually kicks off a chain of callbacks, the last callback in that
     37 // chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
     38 // when it is done with the action that was kicked off by the original
     39 // callback.
     40 
     41 extern grpc_core::TraceFlag grpc_call_combiner_trace;
     42 
     43 typedef struct {
     44   gpr_atm size;  // size_t, num closures in queue or currently executing
     45   gpr_mpscq queue;
     46   // Either 0 (if not cancelled and no cancellation closure set),
     47   // a grpc_closure* (if the lowest bit is 0),
     48   // or a grpc_error* (if the lowest bit is 1).
     49   gpr_atm cancel_state;
     50 } grpc_call_combiner;
     51 
     52 // Assumes memory was initialized to zero.
     53 void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
     54 
     55 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
     56 
     57 #ifndef NDEBUG
     58 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason)   \
     59   grpc_call_combiner_start((call_combiner), (closure), (error), __FILE__, \
     60                            __LINE__, (reason))
     61 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
     62   grpc_call_combiner_stop((call_combiner), __FILE__, __LINE__, (reason))
     63 /// Starts processing \a closure on \a call_combiner.
     64 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
     65                               grpc_closure* closure, grpc_error* error,
     66                               const char* file, int line, const char* reason);
     67 /// Yields the call combiner to the next closure in the queue, if any.
     68 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
     69                              const char* file, int line, const char* reason);
     70 #else
     71 #define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
     72   grpc_call_combiner_start((call_combiner), (closure), (error), (reason))
     73 #define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
     74   grpc_call_combiner_stop((call_combiner), (reason))
     75 /// Starts processing \a closure on \a call_combiner.
     76 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
     77                               grpc_closure* closure, grpc_error* error,
     78                               const char* reason);
     79 /// Yields the call combiner to the next closure in the queue, if any.
     80 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
     81                              const char* reason);
     82 #endif
     83 
     84 /// Registers \a closure to be invoked by \a call_combiner when
     85 /// grpc_call_combiner_cancel() is called.
     86 ///
     87 /// Once a closure is registered, it will always be scheduled exactly
     88 /// once; this allows the closure to hold references that will be freed
     89 /// regardless of whether or not the call was cancelled.  If a cancellation
     90 /// does occur, the closure will be scheduled with the cancellation error;
     91 /// otherwise, it will be scheduled with GRPC_ERROR_NONE.
     92 ///
     93 /// The closure will be scheduled in the following cases:
     94 /// - If grpc_call_combiner_cancel() was called prior to registering the
     95 ///   closure, it will be scheduled immediately with the cancelation error.
     96 /// - If grpc_call_combiner_cancel() is called after registering the
     97 ///   closure, the closure will be scheduled with the cancellation error.
     98 /// - If grpc_call_combiner_set_notify_on_cancel() is called again to
     99 ///   register a new cancellation closure, the previous cancellation
    100 ///   closure will be scheduled with GRPC_ERROR_NONE.
    101 ///
    102 /// If \a closure is NULL, then no closure will be invoked on
    103 /// cancellation; this effectively unregisters the previously set closure.
    104 /// However, most filters will not need to explicitly unregister their
    105 /// callbacks, as this is done automatically when the call is destroyed. Filters
    106 /// that schedule the cancellation closure on ExecCtx do not need to take a ref
    107 /// on the call stack to guarantee closure liveness. This is done by explicitly
    108 /// flushing ExecCtx after the unregistration during call destruction.
    109 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
    110                                              grpc_closure* closure);
    111 
    112 /// Indicates that the call has been cancelled.
    113 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
    114                                grpc_error* error);
    115 
    116 namespace grpc_core {
    117 
    118 // Helper for running a list of closures in a call combiner.
    119 //
    120 // Each callback running in the call combiner will eventually be
    121 // returned to the surface, at which point the surface will yield the
    122 // call combiner.  So when we are running in the call combiner and have
    123 // more than one callback to return to the surface, we need to re-enter
    124 // the call combiner for all but one of those callbacks.
    125 class CallCombinerClosureList {
    126  public:
    127   CallCombinerClosureList() {}
    128 
    129   // Adds a closure to the list.  The closure must eventually result in
    130   // the call combiner being yielded.
    131   void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
    132     closures_.emplace_back(closure, error, reason);
    133   }
    134 
    135   // Runs all closures in the call combiner and yields the call combiner.
    136   //
    137   // All but one of the closures in the list will be scheduled via
    138   // GRPC_CALL_COMBINER_START(), and the remaining closure will be
    139   // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
    140   // yielding the call combiner.  If the list is empty, then the call
    141   // combiner will be yielded immediately.
    142   void RunClosures(grpc_call_combiner* call_combiner) {
    143     if (closures_.empty()) {
    144       GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
    145       return;
    146     }
    147     for (size_t i = 1; i < closures_.size(); ++i) {
    148       auto& closure = closures_[i];
    149       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
    150                                closure.reason);
    151     }
    152     if (grpc_call_combiner_trace.enabled()) {
    153       gpr_log(GPR_INFO,
    154               "CallCombinerClosureList executing closure while already "
    155               "holding call_combiner %p: closure=%p error=%s reason=%s",
    156               call_combiner, closures_[0].closure,
    157               grpc_error_string(closures_[0].error), closures_[0].reason);
    158     }
    159     // This will release the call combiner.
    160     GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
    161     closures_.clear();
    162   }
    163 
    164   // Runs all closures in the call combiner, but does NOT yield the call
    165   // combiner.  All closures will be scheduled via GRPC_CALL_COMBINER_START().
    166   void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
    167     for (size_t i = 0; i < closures_.size(); ++i) {
    168       auto& closure = closures_[i];
    169       GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
    170                                closure.reason);
    171     }
    172     closures_.clear();
    173   }
    174 
    175   size_t size() const { return closures_.size(); }
    176 
    177  private:
    178   struct CallCombinerClosure {
    179     grpc_closure* closure;
    180     grpc_error* error;
    181     const char* reason;
    182 
    183     CallCombinerClosure(grpc_closure* closure, grpc_error* error,
    184                         const char* reason)
    185         : closure(closure), error(error), reason(reason) {}
    186   };
    187 
    188   // There are generally a maximum of 6 closures to run in the call
    189   // combiner, one for each pending op.
    190   InlinedVector<CallCombinerClosure, 6> closures_;
    191 };
    192 
    193 }  // namespace grpc_core
    194 
    195 #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
    196