1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/gprpp/fork.h" 22 23 #include <string.h> 24 25 #include <grpc/support/alloc.h> 26 #include <grpc/support/sync.h> 27 #include <grpc/support/time.h> 28 29 #include "src/core/lib/gpr/env.h" 30 #include "src/core/lib/gpr/useful.h" 31 #include "src/core/lib/gprpp/memory.h" 32 33 /* 34 * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK 35 * AROUND VERY SPECIFIC USE CASES. 36 */ 37 38 namespace grpc_core { 39 namespace internal { 40 // The exec_ctx_count has 2 modes, blocked and unblocked. 41 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates 42 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs... 43 44 // When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx 45 // creation can only be blocked if there is exactly 1 outstanding ExecCtx, 46 // meaning that BLOCKED and UNBLOCKED counts partition the integers 47 #define UNBLOCKED(n) (n + 2) 48 #define BLOCKED(n) (n) 49 50 class ExecCtxState { 51 public: 52 ExecCtxState() : fork_complete_(true) { 53 gpr_mu_init(&mu_); 54 gpr_cv_init(&cv_); 55 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); 56 } 57 58 void IncExecCtxCount() { 59 gpr_atm count = gpr_atm_no_barrier_load(&count_); 60 while (true) { 61 if (count <= BLOCKED(1)) { 62 // This only occurs if we are trying to fork. Wait until the fork() 63 // operation completes before allowing new ExecCtxs. 64 gpr_mu_lock(&mu_); 65 if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) { 66 while (!fork_complete_) { 67 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); 68 } 69 } 70 gpr_mu_unlock(&mu_); 71 } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) { 72 break; 73 } 74 count = gpr_atm_no_barrier_load(&count_); 75 } 76 } 77 78 void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); } 79 80 bool BlockExecCtx() { 81 // Assumes there is an active ExecCtx when this function is called 82 if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) { 83 gpr_mu_lock(&mu_); 84 fork_complete_ = false; 85 gpr_mu_unlock(&mu_); 86 return true; 87 } 88 return false; 89 } 90 91 void AllowExecCtx() { 92 gpr_mu_lock(&mu_); 93 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); 94 fork_complete_ = true; 95 gpr_cv_broadcast(&cv_); 96 gpr_mu_unlock(&mu_); 97 } 98 99 ~ExecCtxState() { 100 gpr_mu_destroy(&mu_); 101 gpr_cv_destroy(&cv_); 102 } 103 104 private: 105 bool fork_complete_; 106 gpr_mu mu_; 107 gpr_cv cv_; 108 gpr_atm count_; 109 }; 110 111 class ThreadState { 112 public: 113 ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) { 114 gpr_mu_init(&mu_); 115 gpr_cv_init(&cv_); 116 } 117 118 void IncThreadCount() { 119 gpr_mu_lock(&mu_); 120 count_++; 121 gpr_mu_unlock(&mu_); 122 } 123 124 void DecThreadCount() { 125 gpr_mu_lock(&mu_); 126 count_--; 127 if (awaiting_threads_ && count_ == 0) { 128 threads_done_ = true; 129 gpr_cv_signal(&cv_); 130 } 131 gpr_mu_unlock(&mu_); 132 } 133 void AwaitThreads() { 134 gpr_mu_lock(&mu_); 135 awaiting_threads_ = true; 136 threads_done_ = (count_ == 0); 137 while (!threads_done_) { 138 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME)); 139 } 140 awaiting_threads_ = true; 141 gpr_mu_unlock(&mu_); 142 } 143 144 ~ThreadState() { 145 gpr_mu_destroy(&mu_); 146 gpr_cv_destroy(&cv_); 147 } 148 149 private: 150 bool awaiting_threads_; 151 bool threads_done_; 152 gpr_mu mu_; 153 gpr_cv cv_; 154 int count_; 155 }; 156 157 } // namespace 158 159 void Fork::GlobalInit() { 160 if (!override_enabled_) { 161 #ifdef GRPC_ENABLE_FORK_SUPPORT 162 support_enabled_ = true; 163 #else 164 support_enabled_ = false; 165 #endif 166 bool env_var_set = false; 167 char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); 168 if (env != nullptr) { 169 static const char* truthy[] = {"yes", "Yes", "YES", "true", 170 "True", "TRUE", "1"}; 171 static const char* falsey[] = {"no", "No", "NO", "false", 172 "False", "FALSE", "0"}; 173 for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { 174 if (0 == strcmp(env, truthy[i])) { 175 support_enabled_ = true; 176 env_var_set = true; 177 break; 178 } 179 } 180 if (!env_var_set) { 181 for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { 182 if (0 == strcmp(env, falsey[i])) { 183 support_enabled_ = false; 184 env_var_set = true; 185 break; 186 } 187 } 188 } 189 gpr_free(env); 190 } 191 } 192 if (support_enabled_) { 193 exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>(); 194 thread_state_ = grpc_core::New<internal::ThreadState>(); 195 } 196 } 197 198 void Fork::GlobalShutdown() { 199 if (support_enabled_) { 200 grpc_core::Delete(exec_ctx_state_); 201 grpc_core::Delete(thread_state_); 202 } 203 } 204 205 bool Fork::Enabled() { return support_enabled_; } 206 207 // Testing Only 208 void Fork::Enable(bool enable) { 209 override_enabled_ = true; 210 support_enabled_ = enable; 211 } 212 213 void Fork::IncExecCtxCount() { 214 if (support_enabled_) { 215 exec_ctx_state_->IncExecCtxCount(); 216 } 217 } 218 219 void Fork::DecExecCtxCount() { 220 if (support_enabled_) { 221 exec_ctx_state_->DecExecCtxCount(); 222 } 223 } 224 225 void Fork::SetResetChildPollingEngineFunc( 226 Fork::child_postfork_func reset_child_polling_engine) { 227 reset_child_polling_engine_ = reset_child_polling_engine; 228 } 229 Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() { 230 return reset_child_polling_engine_; 231 } 232 233 bool Fork::BlockExecCtx() { 234 if (support_enabled_) { 235 return exec_ctx_state_->BlockExecCtx(); 236 } 237 return false; 238 } 239 240 void Fork::AllowExecCtx() { 241 if (support_enabled_) { 242 exec_ctx_state_->AllowExecCtx(); 243 } 244 } 245 246 void Fork::IncThreadCount() { 247 if (support_enabled_) { 248 thread_state_->IncThreadCount(); 249 } 250 } 251 252 void Fork::DecThreadCount() { 253 if (support_enabled_) { 254 thread_state_->DecThreadCount(); 255 } 256 } 257 void Fork::AwaitThreads() { 258 if (support_enabled_) { 259 thread_state_->AwaitThreads(); 260 } 261 } 262 263 internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr; 264 internal::ThreadState* Fork::thread_state_ = nullptr; 265 bool Fork::support_enabled_ = false; 266 bool Fork::override_enabled_ = false; 267 Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr; 268 } // namespace grpc_core 269