Home | History | Annotate | Download | only in Grpc.Core
      1 #region Copyright notice and license
      2 
      3 // Copyright 2015 gRPC authors.
      4 //
      5 // Licensed under the Apache License, Version 2.0 (the "License");
      6 // you may not use this file except in compliance with the License.
      7 // You may obtain a copy of the License at
      8 //
      9 //     http://www.apache.org/licenses/LICENSE-2.0
     10 //
     11 // Unless required by applicable law or agreed to in writing, software
     12 // distributed under the License is distributed on an "AS IS" BASIS,
     13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 // See the License for the specific language governing permissions and
     15 // limitations under the License.
     16 
     17 #endregion
     18 
     19 using System;
     20 using System.Collections.Generic;
     21 using System.Linq;
     22 using System.Runtime.InteropServices;
     23 using System.Threading.Tasks;
     24 using Grpc.Core.Internal;
     25 using Grpc.Core.Logging;
     26 using Grpc.Core.Utils;
     27 
     28 namespace Grpc.Core
     29 {
     30     /// <summary>
     31     /// Encapsulates initialization and shutdown of gRPC library.
     32     /// </summary>
     33     public class GrpcEnvironment
     34     {
     35         const int MinDefaultThreadPoolSize = 4;
     36         const int DefaultBatchContextPoolSharedCapacity = 10000;
     37         const int DefaultBatchContextPoolThreadLocalCapacity = 64;
     38         const int DefaultRequestCallContextPoolSharedCapacity = 10000;
     39         const int DefaultRequestCallContextPoolThreadLocalCapacity = 64;
     40 
     41         static object staticLock = new object();
     42         static GrpcEnvironment instance;
     43         static int refCount;
     44         static int? customThreadPoolSize;
     45         static int? customCompletionQueueCount;
     46         static bool inlineHandlers;
     47         static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity;
     48         static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity;
     49         static int requestCallContextPoolSharedCapacity = DefaultRequestCallContextPoolSharedCapacity;
     50         static int requestCallContextPoolThreadLocalCapacity = DefaultRequestCallContextPoolThreadLocalCapacity;
     51         static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
     52         static readonly HashSet<Server> registeredServers = new HashSet<Server>();
     53         static readonly AtomicCounter nativeInitCounter = new AtomicCounter();
     54 
     55         static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true);
     56 
     57         readonly IObjectPool<BatchContextSafeHandle> batchContextPool;
     58         readonly IObjectPool<RequestCallContextSafeHandle> requestCallContextPool;
     59         readonly GrpcThreadPool threadPool;
     60         readonly DebugStats debugStats = new DebugStats();
     61         readonly AtomicCounter cqPickerCounter = new AtomicCounter();
     62 
     63         bool isShutdown;
     64 
     65         /// <summary>
     66         /// Returns a reference-counted instance of initialized gRPC environment.
     67         /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
     68         /// </summary>
     69         internal static GrpcEnvironment AddRef()
     70         {
     71             ShutdownHooks.Register();
     72 
     73             lock (staticLock)
     74             {
     75                 refCount++;
     76                 if (instance == null)
     77                 {
     78                     instance = new GrpcEnvironment();
     79                 }
     80                 return instance;
     81             }
     82         }
     83 
     84         /// <summary>
     85         /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
     86         /// </summary>
     87         internal static async Task ReleaseAsync()
     88         {
     89             GrpcEnvironment instanceToShutdown = null;
     90             lock (staticLock)
     91             {
     92                 GrpcPreconditions.CheckState(refCount > 0);
     93                 refCount--;
     94                 if (refCount == 0)
     95                 {
     96                     instanceToShutdown = instance;
     97                     instance = null;
     98                 }
     99             }
    100 
    101             if (instanceToShutdown != null)
    102             {
    103                 await instanceToShutdown.ShutdownAsync().ConfigureAwait(false);
    104             }
    105         }
    106 
    107         internal static int GetRefCount()
    108         {
    109             lock (staticLock)
    110             {
    111                 return refCount;
    112             }
    113         }
    114 
    115         internal static void RegisterChannel(Channel channel)
    116         {
    117             lock (staticLock)
    118             {
    119                 GrpcPreconditions.CheckNotNull(channel);
    120                 registeredChannels.Add(channel);
    121             }
    122         }
    123 
    124         internal static void UnregisterChannel(Channel channel)
    125         {
    126             lock (staticLock)
    127             {
    128                 GrpcPreconditions.CheckNotNull(channel);
    129                 GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
    130             }
    131         }
    132 
    133         internal static void RegisterServer(Server server)
    134         {
    135             lock (staticLock)
    136             {
    137                 GrpcPreconditions.CheckNotNull(server);
    138                 registeredServers.Add(server);
    139             }
    140         }
    141 
    142         internal static void UnregisterServer(Server server)
    143         {
    144             lock (staticLock)
    145             {
    146                 GrpcPreconditions.CheckNotNull(server);
    147                 GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
    148             }
    149         }
    150 
    151         /// <summary>
    152         /// Requests shutdown of all channels created by the current process.
    153         /// </summary>
    154         public static Task ShutdownChannelsAsync()
    155         {
    156             HashSet<Channel> snapshot = null;
    157             lock (staticLock)
    158             {
    159                 snapshot = new HashSet<Channel>(registeredChannels);
    160             }
    161             return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
    162         }
    163 
    164         /// <summary>
    165         /// Requests immediate shutdown of all servers created by the current process.
    166         /// </summary>
    167         public static Task KillServersAsync()
    168         {
    169             HashSet<Server> snapshot = null;
    170             lock (staticLock)
    171             {
    172                 snapshot = new HashSet<Server>(registeredServers);
    173             }
    174             return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
    175         }
    176 
    177         /// <summary>
    178         /// Gets application-wide logger used by gRPC.
    179         /// </summary>
    180         /// <value>The logger.</value>
    181         public static ILogger Logger
    182         {
    183             get
    184             {
    185                 return logger;
    186             }
    187         }
    188 
    189         /// <summary>
    190         /// Sets the application-wide logger that should be used by gRPC.
    191         /// </summary>
    192         public static void SetLogger(ILogger customLogger)
    193         {
    194             GrpcPreconditions.CheckNotNull(customLogger, "customLogger");
    195             logger = customLogger;
    196         }
    197 
    198         /// <summary>
    199         /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events.
    200         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
    201         /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing.
    202         /// Most users should rely on the default value provided by gRPC library.
    203         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
    204         /// </summary>
    205         public static void SetThreadPoolSize(int threadCount)
    206         {
    207             lock (staticLock)
    208             {
    209                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
    210                 GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number");
    211                 customThreadPoolSize = threadCount;
    212             }
    213         }
    214 
    215         /// <summary>
    216         /// Sets the number of completion queues in the  gRPC thread pool that polls for internal RPC events.
    217         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
    218         /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
    219         /// Most users should rely on the default value provided by gRPC library.
    220         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
    221         /// </summary>
    222         public static void SetCompletionQueueCount(int completionQueueCount)
    223         {
    224             lock (staticLock)
    225             {
    226                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
    227                 GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
    228                 customCompletionQueueCount = completionQueueCount;
    229             }
    230         }
    231 
    232         /// <summary>
    233         /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
    234         /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
    235         /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
    236         /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
    237         /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
    238         /// Most users should rely on the default value provided by gRPC library.
    239         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
    240         /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
    241         /// </summary>
    242         public static void SetHandlerInlining(bool inlineHandlers)
    243         {
    244             lock (staticLock)
    245             {
    246                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
    247                 GrpcEnvironment.inlineHandlers = inlineHandlers;
    248             }
    249         }
    250 
    251         /// <summary>
    252         /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances
    253         /// instead of creating a new one for every C core operation helps reducing the GC pressure.
    254         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
    255         /// This is an advanced setting and you should only use it if you know what you are doing.
    256         /// Most users should rely on the default value provided by gRPC library.
    257         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
    258         /// </summary>
    259         public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity)
    260         {
    261             lock (staticLock)
    262             {
    263                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
    264                 GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
    265                 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
    266                 batchContextPoolSharedCapacity = sharedCapacity;
    267                 batchContextPoolThreadLocalCapacity = threadLocalCapacity;
    268             }
    269         }
    270 
    271         /// <summary>
    272         /// Sets the parameters for a pool that caches request call context instances. Reusing request call context instances
    273         /// instead of creating a new one for every requested call in C core helps reducing the GC pressure.
    274         /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
    275         /// This is an advanced setting and you should only use it if you know what you are doing.
    276         /// Most users should rely on the default value provided by gRPC library.
    277         /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
    278         /// </summary>
    279         public static void SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity)
    280         {
    281             lock (staticLock)
    282             {
    283                 GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
    284                 GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
    285                 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
    286                 requestCallContextPoolSharedCapacity = sharedCapacity;
    287                 requestCallContextPoolThreadLocalCapacity = threadLocalCapacity;
    288             }
    289         }
    290 
    291         /// <summary>
    292         /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
    293         /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
    294         /// </summary>
    295         public static event EventHandler ShuttingDown;
    296 
    297         /// <summary>
    298         /// Creates gRPC environment.
    299         /// </summary>
    300         private GrpcEnvironment()
    301         {
    302             GrpcNativeInit();
    303             batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
    304             requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity);
    305             threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
    306             threadPool.Start();
    307         }
    308 
    309         /// <summary>
    310         /// Gets the completion queues used by this gRPC environment.
    311         /// </summary>
    312         internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
    313         {
    314             get
    315             {
    316                 return this.threadPool.CompletionQueues;
    317             }
    318         }
    319 
    320         internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool;
    321 
    322         internal IObjectPool<RequestCallContextSafeHandle> RequestCallContextPool => requestCallContextPool;
    323 
    324         internal bool IsAlive
    325         {
    326             get
    327             {
    328                 return this.threadPool.IsAlive;
    329             }
    330         }
    331 
    332         /// <summary>
    333         /// Picks a completion queue in a round-robin fashion.
    334         /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
    335         /// </summary>
    336         internal CompletionQueueSafeHandle PickCompletionQueue()
    337         {
    338             var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
    339             return this.threadPool.CompletionQueues.ElementAt(cqIndex);
    340         }
    341 
    342         /// <summary>
    343         /// Gets the completion queue used by this gRPC environment.
    344         /// </summary>
    345         internal DebugStats DebugStats
    346         {
    347             get
    348             {
    349                 return this.debugStats;
    350             }
    351         }
    352 
    353         /// <summary>
    354         /// Gets version of gRPC C core.
    355         /// </summary>
    356         internal static string GetCoreVersionString()
    357         {
    358             var ptr = NativeMethods.Get().grpcsharp_version_string();  // the pointer is not owned
    359             return Marshal.PtrToStringAnsi(ptr);
    360         }
    361 
    362         internal static void GrpcNativeInit()
    363         {
    364             if (!IsNativeShutdownAllowed && nativeInitCounter.Count > 0)
    365             {
    366                 // Normally grpc_init and grpc_shutdown calls should come in pairs (C core does reference counting),
    367                 // but in case we avoid grpc_shutdown calls altogether, calling grpc_init has no effect
    368                 // besides incrementing an internal C core counter that could theoretically overflow.
    369                 // To avoid this theoretical possibility we guard repeated calls to grpc_init()
    370                 // with a 64-bit atomic counter (that can't realistically overflow).
    371                 return;
    372             }
    373             NativeMethods.Get().grpcsharp_init();
    374             nativeInitCounter.Increment();
    375         }
    376 
    377         internal static void GrpcNativeShutdown()
    378         {
    379             if (IsNativeShutdownAllowed)
    380             {
    381                 NativeMethods.Get().grpcsharp_shutdown();
    382             }
    383         }
    384 
    385         /// <summary>
    386         /// Shuts down this environment.
    387         /// </summary>
    388         private async Task ShutdownAsync()
    389         {
    390             if (isShutdown)
    391             {
    392                 throw new InvalidOperationException("ShutdownAsync has already been called");
    393             }
    394 
    395             await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
    396 
    397             await threadPool.StopAsync().ConfigureAwait(false);
    398             requestCallContextPool.Dispose();
    399             batchContextPool.Dispose();
    400             GrpcNativeShutdown();
    401             isShutdown = true;
    402 
    403             debugStats.CheckOK();
    404         }
    405 
    406         private int GetThreadPoolSizeOrDefault()
    407         {
    408             if (customThreadPoolSize.HasValue)
    409             {
    410                 return customThreadPoolSize.Value;
    411             }
    412             // In systems with many cores, use half of the cores for GrpcThreadPool
    413             // and the other half for .NET thread pool. This heuristic definitely needs
    414             // more work, but seems to work reasonably well for a start.
    415             return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
    416         }
    417 
    418         private int GetCompletionQueueCountOrDefault()
    419         {
    420             if (customCompletionQueueCount.HasValue)
    421             {
    422                 return customCompletionQueueCount.Value;
    423             }
    424             // by default, create a completion queue for each thread
    425             return GetThreadPoolSizeOrDefault();
    426         }
    427 
    428         // On some platforms (specifically iOS), thread local variables in native code
    429         // require initialization/destruction. By skipping the grpc_shutdown() call,
    430         // we avoid a potential crash where grpc_shutdown() has already destroyed
    431         // the thread local variables, but some C core's *_destroy() methods still
    432         // need to run (e.g. they may be run by finalizer thread which is out of our control)
    433         // For more context, see https://github.com/grpc/grpc/issues/16294
    434         private static bool IsNativeShutdownAllowed => !PlatformApis.IsXamarinIOS && !PlatformApis.IsUnityIOS;
    435 
    436         private static class ShutdownHooks
    437         {
    438             static object staticLock = new object();
    439             static bool hooksRegistered;
    440 
    441             public static void Register()
    442             {
    443                 lock (staticLock)
    444                 {
    445                     if (!hooksRegistered)
    446                     {
    447                         // Under normal circumstances, the user is expected to shutdown all
    448                         // the gRPC channels and servers before the application exits. The following
    449                         // hooks provide some extra handling for cases when this is not the case,
    450                         // in the effort to achieve a reasonable behavior on shutdown.
    451 #if NETSTANDARD1_5
    452                         // No action required at shutdown on .NET Core
    453                         // - In-progress P/Invoke calls (such as grpc_completion_queue_next) don't seem
    454                         //   to prevent a .NET core application from terminating, so no special handling
    455                         //   is needed.
    456                         // - .NET core doesn't run finalizers on shutdown, so there's no risk of getting
    457                         //   a crash because grpc_*_destroy methods for native objects being invoked
    458                         //   in wrong order.
    459                         // TODO(jtattermusch): Verify that the shutdown hooks are still not needed
    460                         // once we add support for new platforms using netstandard (e.g. Xamarin).
    461 #else
    462                         // On desktop .NET framework and Mono, we need to register for a shutdown
    463                         // event to explicitly shutdown the GrpcEnvironment.
    464                         // - On Desktop .NET framework, we need to do a proper shutdown to prevent a crash
    465                         //   when the framework attempts to run the finalizers for SafeHandle object representing the native
    466                         //   grpc objects. The finalizers calls the native grpc_*_destroy methods (e.g. grpc_server_destroy)
    467                         //   in a random order, which is not supported by gRPC.
    468                         // - On Mono, the process would hang as the GrpcThreadPool threads are sleeping
    469                         //   in grpc_completion_queue_next P/Invoke invocation and mono won't let the
    470                         //   process shutdown until the P/Invoke calls return. We achieve that by shutting down
    471                         //   the completion queue(s) which associated with the GrpcThreadPool, which will
    472                         //   cause the grpc_completion_queue_next calls to return immediately.
    473                         AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { HandleShutdown(); };
    474                         AppDomain.CurrentDomain.DomainUnload += (sender, eventArgs) => { HandleShutdown(); };
    475 #endif
    476                     }
    477                     hooksRegistered = true;
    478                 }
    479             }
    480 
    481             /// <summary>
    482             /// Handler for AppDomain.DomainUnload, AppDomain.ProcessExit and AssemblyLoadContext.Unloading hooks.
    483             /// </summary>
    484             private static void HandleShutdown()
    485             {
    486                 Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
    487             }
    488         }
    489     }
    490 }
    491