Home | History | Annotate | Download | only in gprpp
      1 /*
      2  *
      3  * Copyright 2017 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/gprpp/fork.h"
     22 
     23 #include <string.h>
     24 
     25 #include <grpc/support/alloc.h>
     26 #include <grpc/support/sync.h>
     27 #include <grpc/support/time.h>
     28 
     29 #include "src/core/lib/gpr/env.h"
     30 #include "src/core/lib/gpr/useful.h"
     31 #include "src/core/lib/gprpp/memory.h"
     32 
     33 /*
     34  * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
     35  *       AROUND VERY SPECIFIC USE CASES.
     36  */
     37 
     38 namespace grpc_core {
     39 namespace internal {
     40 // The exec_ctx_count has 2 modes, blocked and unblocked.
     41 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
     42 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
     43 
     44 // When blocked, the exec_ctx_count is 0-indexed.  Note that ExecCtx
     45 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
     46 // meaning that BLOCKED and UNBLOCKED counts partition the integers
     47 #define UNBLOCKED(n) (n + 2)
     48 #define BLOCKED(n) (n)
     49 
     50 class ExecCtxState {
     51  public:
     52   ExecCtxState() : fork_complete_(true) {
     53     gpr_mu_init(&mu_);
     54     gpr_cv_init(&cv_);
     55     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
     56   }
     57 
     58   void IncExecCtxCount() {
     59     gpr_atm count = gpr_atm_no_barrier_load(&count_);
     60     while (true) {
     61       if (count <= BLOCKED(1)) {
     62         // This only occurs if we are trying to fork.  Wait until the fork()
     63         // operation completes before allowing new ExecCtxs.
     64         gpr_mu_lock(&mu_);
     65         if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
     66           while (!fork_complete_) {
     67             gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
     68           }
     69         }
     70         gpr_mu_unlock(&mu_);
     71       } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
     72         break;
     73       }
     74       count = gpr_atm_no_barrier_load(&count_);
     75     }
     76   }
     77 
     78   void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); }
     79 
     80   bool BlockExecCtx() {
     81     // Assumes there is an active ExecCtx when this function is called
     82     if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
     83       gpr_mu_lock(&mu_);
     84       fork_complete_ = false;
     85       gpr_mu_unlock(&mu_);
     86       return true;
     87     }
     88     return false;
     89   }
     90 
     91   void AllowExecCtx() {
     92     gpr_mu_lock(&mu_);
     93     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
     94     fork_complete_ = true;
     95     gpr_cv_broadcast(&cv_);
     96     gpr_mu_unlock(&mu_);
     97   }
     98 
     99   ~ExecCtxState() {
    100     gpr_mu_destroy(&mu_);
    101     gpr_cv_destroy(&cv_);
    102   }
    103 
    104  private:
    105   bool fork_complete_;
    106   gpr_mu mu_;
    107   gpr_cv cv_;
    108   gpr_atm count_;
    109 };
    110 
    111 class ThreadState {
    112  public:
    113   ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
    114     gpr_mu_init(&mu_);
    115     gpr_cv_init(&cv_);
    116   }
    117 
    118   void IncThreadCount() {
    119     gpr_mu_lock(&mu_);
    120     count_++;
    121     gpr_mu_unlock(&mu_);
    122   }
    123 
    124   void DecThreadCount() {
    125     gpr_mu_lock(&mu_);
    126     count_--;
    127     if (awaiting_threads_ && count_ == 0) {
    128       threads_done_ = true;
    129       gpr_cv_signal(&cv_);
    130     }
    131     gpr_mu_unlock(&mu_);
    132   }
    133   void AwaitThreads() {
    134     gpr_mu_lock(&mu_);
    135     awaiting_threads_ = true;
    136     threads_done_ = (count_ == 0);
    137     while (!threads_done_) {
    138       gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
    139     }
    140     awaiting_threads_ = true;
    141     gpr_mu_unlock(&mu_);
    142   }
    143 
    144   ~ThreadState() {
    145     gpr_mu_destroy(&mu_);
    146     gpr_cv_destroy(&cv_);
    147   }
    148 
    149  private:
    150   bool awaiting_threads_;
    151   bool threads_done_;
    152   gpr_mu mu_;
    153   gpr_cv cv_;
    154   int count_;
    155 };
    156 
    157 }  // namespace
    158 
    159 void Fork::GlobalInit() {
    160   if (!override_enabled_) {
    161 #ifdef GRPC_ENABLE_FORK_SUPPORT
    162     support_enabled_ = true;
    163 #else
    164     support_enabled_ = false;
    165 #endif
    166     bool env_var_set = false;
    167     char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
    168     if (env != nullptr) {
    169       static const char* truthy[] = {"yes",  "Yes",  "YES", "true",
    170                                      "True", "TRUE", "1"};
    171       static const char* falsey[] = {"no",    "No",    "NO", "false",
    172                                      "False", "FALSE", "0"};
    173       for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
    174         if (0 == strcmp(env, truthy[i])) {
    175           support_enabled_ = true;
    176           env_var_set = true;
    177           break;
    178         }
    179       }
    180       if (!env_var_set) {
    181         for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
    182           if (0 == strcmp(env, falsey[i])) {
    183             support_enabled_ = false;
    184             env_var_set = true;
    185             break;
    186           }
    187         }
    188       }
    189       gpr_free(env);
    190     }
    191   }
    192   if (support_enabled_) {
    193     exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>();
    194     thread_state_ = grpc_core::New<internal::ThreadState>();
    195   }
    196 }
    197 
    198 void Fork::GlobalShutdown() {
    199   if (support_enabled_) {
    200     grpc_core::Delete(exec_ctx_state_);
    201     grpc_core::Delete(thread_state_);
    202   }
    203 }
    204 
    205 bool Fork::Enabled() { return support_enabled_; }
    206 
    207 // Testing Only
    208 void Fork::Enable(bool enable) {
    209   override_enabled_ = true;
    210   support_enabled_ = enable;
    211 }
    212 
    213 void Fork::IncExecCtxCount() {
    214   if (support_enabled_) {
    215     exec_ctx_state_->IncExecCtxCount();
    216   }
    217 }
    218 
    219 void Fork::DecExecCtxCount() {
    220   if (support_enabled_) {
    221     exec_ctx_state_->DecExecCtxCount();
    222   }
    223 }
    224 
    225 void Fork::SetResetChildPollingEngineFunc(
    226     Fork::child_postfork_func reset_child_polling_engine) {
    227   reset_child_polling_engine_ = reset_child_polling_engine;
    228 }
    229 Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
    230   return reset_child_polling_engine_;
    231 }
    232 
    233 bool Fork::BlockExecCtx() {
    234   if (support_enabled_) {
    235     return exec_ctx_state_->BlockExecCtx();
    236   }
    237   return false;
    238 }
    239 
    240 void Fork::AllowExecCtx() {
    241   if (support_enabled_) {
    242     exec_ctx_state_->AllowExecCtx();
    243   }
    244 }
    245 
    246 void Fork::IncThreadCount() {
    247   if (support_enabled_) {
    248     thread_state_->IncThreadCount();
    249   }
    250 }
    251 
    252 void Fork::DecThreadCount() {
    253   if (support_enabled_) {
    254     thread_state_->DecThreadCount();
    255   }
    256 }
    257 void Fork::AwaitThreads() {
    258   if (support_enabled_) {
    259     thread_state_->AwaitThreads();
    260   }
    261 }
    262 
    263 internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
    264 internal::ThreadState* Fork::thread_state_ = nullptr;
    265 bool Fork::support_enabled_ = false;
    266 bool Fork::override_enabled_ = false;
    267 Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
    268 }  // namespace grpc_core
    269