Home | History | Annotate | Download | only in iomgr
      1 /*
      2  *
      3  * Copyright 2016 gRPC authors.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  */
     18 
     19 #include <grpc/support/port_platform.h>
     20 
     21 #include "src/core/lib/iomgr/resource_quota.h"
     22 
     23 #include <inttypes.h>
     24 #include <limits.h>
     25 #include <stdint.h>
     26 #include <string.h>
     27 
     28 #include <grpc/slice_buffer.h>
     29 #include <grpc/support/alloc.h>
     30 #include <grpc/support/log.h>
     31 #include <grpc/support/string_util.h>
     32 
     33 #include "src/core/lib/gpr/useful.h"
     34 #include "src/core/lib/iomgr/combiner.h"
     35 
     36 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
     37 
     38 #define MEMORY_USAGE_ESTIMATION_MAX 65536
     39 
     40 /* Internal linked list pointers for a resource user */
     41 typedef struct {
     42   grpc_resource_user* next;
     43   grpc_resource_user* prev;
     44 } grpc_resource_user_link;
     45 
     46 /* Resource users are kept in (potentially) several intrusive linked lists
     47    at once. These are the list names. */
     48 typedef enum {
     49   /* Resource users that are waiting for an allocation */
     50   GRPC_RULIST_AWAITING_ALLOCATION,
     51   /* Resource users that have free memory available for internal reclamation */
     52   GRPC_RULIST_NON_EMPTY_FREE_POOL,
     53   /* Resource users that have published a benign reclamation is available */
     54   GRPC_RULIST_RECLAIMER_BENIGN,
     55   /* Resource users that have published a destructive reclamation is
     56      available */
     57   GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
     58   /* Number of lists: must be last */
     59   GRPC_RULIST_COUNT
     60 } grpc_rulist;
     61 
     62 struct grpc_resource_user {
     63   /* The quota this resource user consumes from */
     64   grpc_resource_quota* resource_quota;
     65 
     66   /* Closure to schedule an allocation under the resource quota combiner lock */
     67   grpc_closure allocate_closure;
     68   /* Closure to publish a non empty free pool under the resource quota combiner
     69      lock */
     70   grpc_closure add_to_free_pool_closure;
     71 
     72   /* one ref for each ref call (released by grpc_resource_user_unref), and one
     73      ref for each byte allocated (released by grpc_resource_user_free) */
     74   gpr_atm refs;
     75   /* is this resource user unlocked? starts at 0, increases for each shutdown
     76      call */
     77   gpr_atm shutdown;
     78 
     79   gpr_mu mu;
     80   /* The amount of memory (in bytes) this user has cached for its own use: to
     81      avoid quota contention, each resource user can keep some memory in
     82      addition to what it is immediately using (e.g., for caching), and the quota
     83      can pull it back under memory pressure.
     84      This value can become negative if more memory has been requested than
     85      existed in the free pool, at which point the quota is consulted to bring
     86      this value non-negative (asynchronously). */
     87   int64_t free_pool;
     88   /* A list of closures to call once free_pool becomes non-negative - ie when
     89      all outstanding allocations have been granted. */
     90   grpc_closure_list on_allocated;
     91   /* True if we are currently trying to allocate from the quota, false if not */
     92   bool allocating;
     93   /* How many bytes of allocations are outstanding */
     94   int64_t outstanding_allocations;
     95   /* True if we are currently trying to add ourselves to the non-free quota
     96      list, false otherwise */
     97   bool added_to_free_pool;
     98 
     99   /* The number of threads currently allocated to this resource user */
    100   gpr_atm num_threads_allocated;
    101 
    102   /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
    103    */
    104   grpc_closure* reclaimers[2];
    105   /* Reclaimers just posted: once we're in the combiner lock, we'll move them
    106      to the array above */
    107   grpc_closure* new_reclaimers[2];
    108   /* Trampoline closures to finish reclamation and re-enter the quota combiner
    109      lock */
    110   grpc_closure post_reclaimer_closure[2];
    111 
    112   /* Closure to execute under the quota combiner to de-register and shutdown the
    113      resource user */
    114   grpc_closure destroy_closure;
    115 
    116   /* Links in the various grpc_rulist lists */
    117   grpc_resource_user_link links[GRPC_RULIST_COUNT];
    118 
    119   /* The name of this resource user, for debugging/tracing */
    120   char* name;
    121 };
    122 
    123 struct grpc_resource_quota {
    124   /* refcount */
    125   gpr_refcount refs;
    126 
    127   /* estimate of current memory usage
    128      scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
    129   gpr_atm memory_usage_estimation;
    130 
    131   /* Master combiner lock: all activity on a quota executes under this combiner
    132    * (so no mutex is needed for this data structure) */
    133   grpc_combiner* combiner;
    134   /* Size of the resource quota */
    135   int64_t size;
    136   /* Amount of free memory in the resource quota */
    137   int64_t free_pool;
    138 
    139   gpr_atm last_size;
    140 
    141   /* Mutex to protect max_threads and num_threads_allocated */
    142   /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
    143    * and avoid having this mutex; but in that case, each invocation of the
    144    * function grpc_resource_user_allocate_threads() would have had to do at
    145    * least two atomic loads (for max_threads and num_threads_allocated) followed
    146    * by a CAS (on num_threads_allocated).
    147    * Moreover, we expect grpc_resource_user_allocate_threads() to be often
    148    * called concurrently thereby increasing the chances of failing the CAS
    149    * operation. This additional complexity is not worth the tiny perf gain we
    150    * may (or may not) have by using atomics */
    151   gpr_mu thread_count_mu;
    152 
    153   /* Max number of threads allowed */
    154   int max_threads;
    155 
    156   /* Number of threads currently allocated via this resource_quota object */
    157   int num_threads_allocated;
    158 
    159   /* Has rq_step been scheduled to occur? */
    160   bool step_scheduled;
    161 
    162   /* Are we currently reclaiming memory */
    163   bool reclaiming;
    164 
    165   /* Closure around rq_step */
    166   grpc_closure rq_step_closure;
    167 
    168   /* Closure around rq_reclamation_done */
    169   grpc_closure rq_reclamation_done_closure;
    170 
    171   /* This is only really usable for debugging: it's always a stale pointer, but
    172      a stale pointer that might just be fresh enough to guide us to where the
    173      reclamation system is stuck */
    174   grpc_closure* debug_only_last_initiated_reclaimer;
    175   grpc_resource_user* debug_only_last_reclaimer_resource_user;
    176 
    177   /* Roots of all resource user lists */
    178   grpc_resource_user* roots[GRPC_RULIST_COUNT];
    179 
    180   char* name;
    181 };
    182 
    183 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
    184 
    185 /*******************************************************************************
    186  * list management
    187  */
    188 
    189 static void rulist_add_head(grpc_resource_user* resource_user,
    190                             grpc_rulist list) {
    191   grpc_resource_quota* resource_quota = resource_user->resource_quota;
    192   grpc_resource_user** root = &resource_quota->roots[list];
    193   if (*root == nullptr) {
    194     *root = resource_user;
    195     resource_user->links[list].next = resource_user->links[list].prev =
    196         resource_user;
    197   } else {
    198     resource_user->links[list].next = *root;
    199     resource_user->links[list].prev = (*root)->links[list].prev;
    200     resource_user->links[list].next->links[list].prev =
    201         resource_user->links[list].prev->links[list].next = resource_user;
    202     *root = resource_user;
    203   }
    204 }
    205 
    206 static void rulist_add_tail(grpc_resource_user* resource_user,
    207                             grpc_rulist list) {
    208   grpc_resource_quota* resource_quota = resource_user->resource_quota;
    209   grpc_resource_user** root = &resource_quota->roots[list];
    210   if (*root == nullptr) {
    211     *root = resource_user;
    212     resource_user->links[list].next = resource_user->links[list].prev =
    213         resource_user;
    214   } else {
    215     resource_user->links[list].next = (*root)->links[list].next;
    216     resource_user->links[list].prev = *root;
    217     resource_user->links[list].next->links[list].prev =
    218         resource_user->links[list].prev->links[list].next = resource_user;
    219   }
    220 }
    221 
    222 static bool rulist_empty(grpc_resource_quota* resource_quota,
    223                          grpc_rulist list) {
    224   return resource_quota->roots[list] == nullptr;
    225 }
    226 
    227 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
    228                                            grpc_rulist list) {
    229   grpc_resource_user** root = &resource_quota->roots[list];
    230   grpc_resource_user* resource_user = *root;
    231   if (resource_user == nullptr) {
    232     return nullptr;
    233   }
    234   if (resource_user->links[list].next == resource_user) {
    235     *root = nullptr;
    236   } else {
    237     resource_user->links[list].next->links[list].prev =
    238         resource_user->links[list].prev;
    239     resource_user->links[list].prev->links[list].next =
    240         resource_user->links[list].next;
    241     *root = resource_user->links[list].next;
    242   }
    243   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
    244   return resource_user;
    245 }
    246 
    247 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
    248   if (resource_user->links[list].next == nullptr) return;
    249   grpc_resource_quota* resource_quota = resource_user->resource_quota;
    250   if (resource_quota->roots[list] == resource_user) {
    251     resource_quota->roots[list] = resource_user->links[list].next;
    252     if (resource_quota->roots[list] == resource_user) {
    253       resource_quota->roots[list] = nullptr;
    254     }
    255   }
    256   resource_user->links[list].next->links[list].prev =
    257       resource_user->links[list].prev;
    258   resource_user->links[list].prev->links[list].next =
    259       resource_user->links[list].next;
    260   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
    261 }
    262 
    263 /*******************************************************************************
    264  * resource quota state machine
    265  */
    266 
    267 static bool rq_alloc(grpc_resource_quota* resource_quota);
    268 static bool rq_reclaim_from_per_user_free_pool(
    269     grpc_resource_quota* resource_quota);
    270 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
    271 
    272 static void rq_step(void* rq, grpc_error* error) {
    273   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
    274   resource_quota->step_scheduled = false;
    275   do {
    276     if (rq_alloc(resource_quota)) goto done;
    277   } while (rq_reclaim_from_per_user_free_pool(resource_quota));
    278 
    279   if (!rq_reclaim(resource_quota, false)) {
    280     rq_reclaim(resource_quota, true);
    281   }
    282 
    283 done:
    284   grpc_resource_quota_unref_internal(resource_quota);
    285 }
    286 
    287 static void rq_step_sched(grpc_resource_quota* resource_quota) {
    288   if (resource_quota->step_scheduled) return;
    289   resource_quota->step_scheduled = true;
    290   grpc_resource_quota_ref_internal(resource_quota);
    291   GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
    292 }
    293 
    294 /* update the atomically available resource estimate - use no barriers since
    295    timeliness of delivery really doesn't matter much */
    296 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
    297   gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
    298   if (resource_quota->size != 0) {
    299     memory_usage_estimation =
    300         GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
    301                                        ((double)resource_quota->size)) *
    302                             MEMORY_USAGE_ESTIMATION_MAX),
    303                   0, MEMORY_USAGE_ESTIMATION_MAX);
    304   }
    305   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
    306                            memory_usage_estimation);
    307 }
    308 
    309 /* returns true if all allocations are completed */
    310 static bool rq_alloc(grpc_resource_quota* resource_quota) {
    311   grpc_resource_user* resource_user;
    312   while ((resource_user = rulist_pop_head(resource_quota,
    313                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
    314     gpr_mu_lock(&resource_user->mu);
    315     if (grpc_resource_quota_trace.enabled()) {
    316       gpr_log(GPR_INFO,
    317               "RQ: check allocation for user %p shutdown=%" PRIdPTR
    318               " free_pool=%" PRId64,
    319               resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
    320               resource_user->free_pool);
    321     }
    322     if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
    323       resource_user->allocating = false;
    324       grpc_closure_list_fail_all(
    325           &resource_user->on_allocated,
    326           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
    327       int64_t aborted_allocations = resource_user->outstanding_allocations;
    328       resource_user->outstanding_allocations = 0;
    329       resource_user->free_pool += aborted_allocations;
    330       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
    331       gpr_mu_unlock(&resource_user->mu);
    332       ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
    333       continue;
    334     }
    335     if (resource_user->free_pool < 0 &&
    336         -resource_user->free_pool <= resource_quota->free_pool) {
    337       int64_t amt = -resource_user->free_pool;
    338       resource_user->free_pool = 0;
    339       resource_quota->free_pool -= amt;
    340       rq_update_estimate(resource_quota);
    341       if (grpc_resource_quota_trace.enabled()) {
    342         gpr_log(GPR_INFO,
    343                 "RQ %s %s: grant alloc %" PRId64
    344                 " bytes; rq_free_pool -> %" PRId64,
    345                 resource_quota->name, resource_user->name, amt,
    346                 resource_quota->free_pool);
    347       }
    348     } else if (grpc_resource_quota_trace.enabled() &&
    349                resource_user->free_pool >= 0) {
    350       gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
    351               resource_quota->name, resource_user->name);
    352     }
    353     if (resource_user->free_pool >= 0) {
    354       resource_user->allocating = false;
    355       resource_user->outstanding_allocations = 0;
    356       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
    357       gpr_mu_unlock(&resource_user->mu);
    358     } else {
    359       rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
    360       gpr_mu_unlock(&resource_user->mu);
    361       return false;
    362     }
    363   }
    364   return true;
    365 }
    366 
    367 /* returns true if any memory could be reclaimed from buffers */
    368 static bool rq_reclaim_from_per_user_free_pool(
    369     grpc_resource_quota* resource_quota) {
    370   grpc_resource_user* resource_user;
    371   while ((resource_user = rulist_pop_head(resource_quota,
    372                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
    373     gpr_mu_lock(&resource_user->mu);
    374     if (resource_user->free_pool > 0) {
    375       int64_t amt = resource_user->free_pool;
    376       resource_user->free_pool = 0;
    377       resource_quota->free_pool += amt;
    378       rq_update_estimate(resource_quota);
    379       if (grpc_resource_quota_trace.enabled()) {
    380         gpr_log(GPR_INFO,
    381                 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
    382                 " bytes; rq_free_pool -> %" PRId64,
    383                 resource_quota->name, resource_user->name, amt,
    384                 resource_quota->free_pool);
    385       }
    386       gpr_mu_unlock(&resource_user->mu);
    387       return true;
    388     } else {
    389       gpr_mu_unlock(&resource_user->mu);
    390     }
    391   }
    392   return false;
    393 }
    394 
    395 /* returns true if reclamation is proceeding */
    396 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
    397   if (resource_quota->reclaiming) return true;
    398   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
    399                                  : GRPC_RULIST_RECLAIMER_BENIGN;
    400   grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
    401   if (resource_user == nullptr) return false;
    402   if (grpc_resource_quota_trace.enabled()) {
    403     gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
    404             resource_user->name, destructive ? "destructive" : "benign");
    405   }
    406   resource_quota->reclaiming = true;
    407   grpc_resource_quota_ref_internal(resource_quota);
    408   grpc_closure* c = resource_user->reclaimers[destructive];
    409   GPR_ASSERT(c);
    410   resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
    411   resource_quota->debug_only_last_initiated_reclaimer = c;
    412   resource_user->reclaimers[destructive] = nullptr;
    413   GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
    414   return true;
    415 }
    416 
    417 /*******************************************************************************
    418  * ru_slice: a slice implementation that is backed by a grpc_resource_user
    419  */
    420 
    421 typedef struct {
    422   grpc_slice_refcount base;
    423   gpr_refcount refs;
    424   grpc_resource_user* resource_user;
    425   size_t size;
    426 } ru_slice_refcount;
    427 
    428 static void ru_slice_ref(void* p) {
    429   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
    430   gpr_ref(&rc->refs);
    431 }
    432 
    433 static void ru_slice_unref(void* p) {
    434   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
    435   if (gpr_unref(&rc->refs)) {
    436     grpc_resource_user_free(rc->resource_user, rc->size);
    437     gpr_free(rc);
    438   }
    439 }
    440 
    441 static const grpc_slice_refcount_vtable ru_slice_vtable = {
    442     ru_slice_ref, ru_slice_unref, grpc_slice_default_eq_impl,
    443     grpc_slice_default_hash_impl};
    444 
    445 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
    446                                   size_t size) {
    447   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(
    448       gpr_malloc(sizeof(ru_slice_refcount) + size));
    449   rc->base.vtable = &ru_slice_vtable;
    450   rc->base.sub_refcount = &rc->base;
    451   gpr_ref_init(&rc->refs, 1);
    452   rc->resource_user = resource_user;
    453   rc->size = size;
    454   grpc_slice slice;
    455   slice.refcount = &rc->base;
    456   slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
    457   slice.data.refcounted.length = size;
    458   return slice;
    459 }
    460 
    461 /*******************************************************************************
    462  * grpc_resource_quota internal implementation: resource user manipulation under
    463  * the combiner
    464  */
    465 
    466 static void ru_allocate(void* ru, grpc_error* error) {
    467   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
    468   if (rulist_empty(resource_user->resource_quota,
    469                    GRPC_RULIST_AWAITING_ALLOCATION)) {
    470     rq_step_sched(resource_user->resource_quota);
    471   }
    472   rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
    473 }
    474 
    475 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
    476   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
    477   if (!rulist_empty(resource_user->resource_quota,
    478                     GRPC_RULIST_AWAITING_ALLOCATION) &&
    479       rulist_empty(resource_user->resource_quota,
    480                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
    481     rq_step_sched(resource_user->resource_quota);
    482   }
    483   rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
    484 }
    485 
    486 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
    487                               bool destructive) {
    488   grpc_closure* closure = resource_user->new_reclaimers[destructive];
    489   GPR_ASSERT(closure != nullptr);
    490   resource_user->new_reclaimers[destructive] = nullptr;
    491   GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
    492   if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
    493     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
    494     return false;
    495   }
    496   resource_user->reclaimers[destructive] = closure;
    497   return true;
    498 }
    499 
    500 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
    501   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
    502   if (!ru_post_reclaimer(resource_user, false)) return;
    503   if (!rulist_empty(resource_user->resource_quota,
    504                     GRPC_RULIST_AWAITING_ALLOCATION) &&
    505       rulist_empty(resource_user->resource_quota,
    506                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
    507       rulist_empty(resource_user->resource_quota,
    508                    GRPC_RULIST_RECLAIMER_BENIGN)) {
    509     rq_step_sched(resource_user->resource_quota);
    510   }
    511   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
    512 }
    513 
    514 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
    515   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
    516   if (!ru_post_reclaimer(resource_user, true)) return;
    517   if (!rulist_empty(resource_user->resource_quota,
    518                     GRPC_RULIST_AWAITING_ALLOCATION) &&
    519       rulist_empty(resource_user->resource_quota,
    520                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
    521       rulist_empty(resource_user->resource_quota,
    522                    GRPC_RULIST_RECLAIMER_BENIGN) &&
    523       rulist_empty(resource_user->resource_quota,
    524                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
    525     rq_step_sched(resource_user->resource_quota);
    526   }
    527   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
    528 }
    529 
    530 static void ru_shutdown(void* ru, grpc_error* error) {
    531   if (grpc_resource_quota_trace.enabled()) {
    532     gpr_log(GPR_INFO, "RU shutdown %p", ru);
    533   }
    534   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
    535   gpr_mu_lock(&resource_user->mu);
    536   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
    537   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
    538   resource_user->reclaimers[0] = nullptr;
    539   resource_user->reclaimers[1] = nullptr;
    540   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
    541   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
    542   if (resource_user->allocating) {
    543     rq_step_sched(resource_user->resource_quota);
    544   }
    545   gpr_mu_unlock(&resource_user->mu);
    546 }
    547 
    548 static void ru_destroy(void* ru, grpc_error* error) {
    549   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
    550   GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
    551   // Free all the remaining thread quota
    552   grpc_resource_user_free_threads(resource_user,
    553                                   static_cast<int>(gpr_atm_no_barrier_load(
    554                                       &resource_user->num_threads_allocated)));
    555 
    556   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
    557     rulist_remove(resource_user, static_cast<grpc_rulist>(i));
    558   }
    559   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
    560   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
    561   if (resource_user->free_pool != 0) {
    562     resource_user->resource_quota->free_pool += resource_user->free_pool;
    563     rq_step_sched(resource_user->resource_quota);
    564   }
    565   grpc_resource_quota_unref_internal(resource_user->resource_quota);
    566   gpr_mu_destroy(&resource_user->mu);
    567   gpr_free(resource_user->name);
    568   gpr_free(resource_user);
    569 }
    570 
    571 static void ru_allocated_slices(void* arg, grpc_error* error) {
    572   grpc_resource_user_slice_allocator* slice_allocator =
    573       static_cast<grpc_resource_user_slice_allocator*>(arg);
    574   if (error == GRPC_ERROR_NONE) {
    575     for (size_t i = 0; i < slice_allocator->count; i++) {
    576       grpc_slice_buffer_add_indexed(
    577           slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
    578                                                  slice_allocator->length));
    579     }
    580   }
    581   GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
    582 }
    583 
    584 /*******************************************************************************
    585  * grpc_resource_quota internal implementation: quota manipulation under the
    586  * combiner
    587  */
    588 
    589 typedef struct {
    590   int64_t size;
    591   grpc_resource_quota* resource_quota;
    592   grpc_closure closure;
    593 } rq_resize_args;
    594 
    595 static void rq_resize(void* args, grpc_error* error) {
    596   rq_resize_args* a = static_cast<rq_resize_args*>(args);
    597   int64_t delta = a->size - a->resource_quota->size;
    598   a->resource_quota->size += delta;
    599   a->resource_quota->free_pool += delta;
    600   rq_update_estimate(a->resource_quota);
    601   rq_step_sched(a->resource_quota);
    602   grpc_resource_quota_unref_internal(a->resource_quota);
    603   gpr_free(a);
    604 }
    605 
    606 static void rq_reclamation_done(void* rq, grpc_error* error) {
    607   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
    608   resource_quota->reclaiming = false;
    609   rq_step_sched(resource_quota);
    610   grpc_resource_quota_unref_internal(resource_quota);
    611 }
    612 
    613 /*******************************************************************************
    614  * grpc_resource_quota api
    615  */
    616 
    617 /* Public API */
    618 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
    619   grpc_resource_quota* resource_quota =
    620       static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
    621   gpr_ref_init(&resource_quota->refs, 1);
    622   resource_quota->combiner = grpc_combiner_create();
    623   resource_quota->free_pool = INT64_MAX;
    624   resource_quota->size = INT64_MAX;
    625   gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
    626   gpr_mu_init(&resource_quota->thread_count_mu);
    627   resource_quota->max_threads = INT_MAX;
    628   resource_quota->num_threads_allocated = 0;
    629   resource_quota->step_scheduled = false;
    630   resource_quota->reclaiming = false;
    631   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
    632   if (name != nullptr) {
    633     resource_quota->name = gpr_strdup(name);
    634   } else {
    635     gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
    636                  (intptr_t)resource_quota);
    637   }
    638   GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
    639                     grpc_combiner_finally_scheduler(resource_quota->combiner));
    640   GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
    641                     rq_reclamation_done, resource_quota,
    642                     grpc_combiner_scheduler(resource_quota->combiner));
    643   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
    644     resource_quota->roots[i] = nullptr;
    645   }
    646   return resource_quota;
    647 }
    648 
    649 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
    650   if (gpr_unref(&resource_quota->refs)) {
    651     // No outstanding thread quota
    652     GPR_ASSERT(resource_quota->num_threads_allocated == 0);
    653     GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
    654     gpr_free(resource_quota->name);
    655     gpr_free(resource_quota);
    656   }
    657 }
    658 
    659 /* Public API */
    660 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
    661   grpc_core::ExecCtx exec_ctx;
    662   grpc_resource_quota_unref_internal(resource_quota);
    663 }
    664 
    665 grpc_resource_quota* grpc_resource_quota_ref_internal(
    666     grpc_resource_quota* resource_quota) {
    667   gpr_ref(&resource_quota->refs);
    668   return resource_quota;
    669 }
    670 
    671 /* Public API */
    672 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
    673   grpc_resource_quota_ref_internal(resource_quota);
    674 }
    675 
    676 double grpc_resource_quota_get_memory_pressure(
    677     grpc_resource_quota* resource_quota) {
    678   return (static_cast<double>(gpr_atm_no_barrier_load(
    679              &resource_quota->memory_usage_estimation))) /
    680          (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
    681 }
    682 
    683 /* Public API */
    684 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
    685                                          int new_max_threads) {
    686   GPR_ASSERT(new_max_threads >= 0);
    687   gpr_mu_lock(&resource_quota->thread_count_mu);
    688   resource_quota->max_threads = new_max_threads;
    689   gpr_mu_unlock(&resource_quota->thread_count_mu);
    690 }
    691 
    692 /* Public API */
    693 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
    694                                 size_t size) {
    695   grpc_core::ExecCtx exec_ctx;
    696   rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
    697   a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
    698   a->size = static_cast<int64_t>(size);
    699   gpr_atm_no_barrier_store(&resource_quota->last_size,
    700                            (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
    701   GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
    702   GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
    703 }
    704 
    705 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
    706   return static_cast<size_t>(
    707       gpr_atm_no_barrier_load(&resource_quota->last_size));
    708 }
    709 
    710 /*******************************************************************************
    711  * grpc_resource_user channel args api
    712  */
    713 
    714 grpc_resource_quota* grpc_resource_quota_from_channel_args(
    715     const grpc_channel_args* channel_args) {
    716   for (size_t i = 0; i < channel_args->num_args; i++) {
    717     if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
    718       if (channel_args->args[i].type == GRPC_ARG_POINTER) {
    719         return grpc_resource_quota_ref_internal(
    720             static_cast<grpc_resource_quota*>(
    721                 channel_args->args[i].value.pointer.p));
    722       } else {
    723         gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
    724       }
    725     }
    726   }
    727   return grpc_resource_quota_create(nullptr);
    728 }
    729 
    730 static void* rq_copy(void* rq) {
    731   grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
    732   return rq;
    733 }
    734 
    735 static void rq_destroy(void* rq) {
    736   grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
    737 }
    738 
    739 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
    740 
    741 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
    742   static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
    743   return &vtable;
    744 }
    745 
    746 /*******************************************************************************
    747  * grpc_resource_user api
    748  */
    749 
    750 grpc_resource_user* grpc_resource_user_create(
    751     grpc_resource_quota* resource_quota, const char* name) {
    752   grpc_resource_user* resource_user =
    753       static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
    754   resource_user->resource_quota =
    755       grpc_resource_quota_ref_internal(resource_quota);
    756   GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
    757                     resource_user,
    758                     grpc_combiner_scheduler(resource_quota->combiner));
    759   GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
    760                     &ru_add_to_free_pool, resource_user,
    761                     grpc_combiner_scheduler(resource_quota->combiner));
    762   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
    763                     &ru_post_benign_reclaimer, resource_user,
    764                     grpc_combiner_scheduler(resource_quota->combiner));
    765   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
    766                     &ru_post_destructive_reclaimer, resource_user,
    767                     grpc_combiner_scheduler(resource_quota->combiner));
    768   GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
    769                     grpc_combiner_scheduler(resource_quota->combiner));
    770   gpr_mu_init(&resource_user->mu);
    771   gpr_atm_rel_store(&resource_user->refs, 1);
    772   gpr_atm_rel_store(&resource_user->shutdown, 0);
    773   resource_user->free_pool = 0;
    774   grpc_closure_list_init(&resource_user->on_allocated);
    775   resource_user->allocating = false;
    776   resource_user->added_to_free_pool = false;
    777   gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
    778   resource_user->reclaimers[0] = nullptr;
    779   resource_user->reclaimers[1] = nullptr;
    780   resource_user->new_reclaimers[0] = nullptr;
    781   resource_user->new_reclaimers[1] = nullptr;
    782   resource_user->outstanding_allocations = 0;
    783   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
    784     resource_user->links[i].next = resource_user->links[i].prev = nullptr;
    785   }
    786   if (name != nullptr) {
    787     resource_user->name = gpr_strdup(name);
    788   } else {
    789     gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
    790                  (intptr_t)resource_user);
    791   }
    792   return resource_user;
    793 }
    794 
    795 grpc_resource_quota* grpc_resource_user_quota(
    796     grpc_resource_user* resource_user) {
    797   return resource_user->resource_quota;
    798 }
    799 
    800 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
    801   GPR_ASSERT(amount > 0);
    802   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
    803 }
    804 
    805 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
    806   GPR_ASSERT(amount > 0);
    807   gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
    808   GPR_ASSERT(old >= amount);
    809   if (old == amount) {
    810     GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
    811   }
    812 }
    813 
    814 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
    815   ru_ref_by(resource_user, 1);
    816 }
    817 
    818 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
    819   ru_unref_by(resource_user, 1);
    820 }
    821 
    822 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
    823   if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
    824     GRPC_CLOSURE_SCHED(
    825         GRPC_CLOSURE_CREATE(
    826             ru_shutdown, resource_user,
    827             grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
    828         GRPC_ERROR_NONE);
    829   }
    830 }
    831 
    832 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
    833                                          int thread_count) {
    834   GPR_ASSERT(thread_count >= 0);
    835   bool is_success = false;
    836   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
    837   grpc_resource_quota* rq = resource_user->resource_quota;
    838   if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
    839     rq->num_threads_allocated += thread_count;
    840     gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
    841                                  thread_count);
    842     is_success = true;
    843   }
    844   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
    845   return is_success;
    846 }
    847 
    848 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
    849                                      int thread_count) {
    850   GPR_ASSERT(thread_count >= 0);
    851   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
    852   grpc_resource_quota* rq = resource_user->resource_quota;
    853   rq->num_threads_allocated -= thread_count;
    854   int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
    855       &resource_user->num_threads_allocated, -thread_count));
    856   if (old_count < thread_count || rq->num_threads_allocated < 0) {
    857     gpr_log(GPR_ERROR,
    858             "Releasing more threads (%d) than currently allocated (rq threads: "
    859             "%d, ru threads: %d)",
    860             thread_count, rq->num_threads_allocated + thread_count, old_count);
    861     abort();
    862   }
    863   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
    864 }
    865 
    866 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
    867                               grpc_closure* optional_on_done) {
    868   gpr_mu_lock(&resource_user->mu);
    869   ru_ref_by(resource_user, static_cast<gpr_atm>(size));
    870   resource_user->free_pool -= static_cast<int64_t>(size);
    871   resource_user->outstanding_allocations += static_cast<int64_t>(size);
    872   if (grpc_resource_quota_trace.enabled()) {
    873     gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
    874             resource_user->resource_quota->name, resource_user->name, size,
    875             resource_user->free_pool);
    876   }
    877   if (resource_user->free_pool < 0) {
    878     grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
    879                              GRPC_ERROR_NONE);
    880     if (!resource_user->allocating) {
    881       resource_user->allocating = true;
    882       GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
    883     }
    884   } else {
    885     resource_user->outstanding_allocations -= static_cast<int64_t>(size);
    886     GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
    887   }
    888   gpr_mu_unlock(&resource_user->mu);
    889 }
    890 
    891 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
    892   gpr_mu_lock(&resource_user->mu);
    893   bool was_zero_or_negative = resource_user->free_pool <= 0;
    894   resource_user->free_pool += static_cast<int64_t>(size);
    895   if (grpc_resource_quota_trace.enabled()) {
    896     gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
    897             resource_user->resource_quota->name, resource_user->name, size,
    898             resource_user->free_pool);
    899   }
    900   bool is_bigger_than_zero = resource_user->free_pool > 0;
    901   if (is_bigger_than_zero && was_zero_or_negative &&
    902       !resource_user->added_to_free_pool) {
    903     resource_user->added_to_free_pool = true;
    904     GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
    905                        GRPC_ERROR_NONE);
    906   }
    907   gpr_mu_unlock(&resource_user->mu);
    908   ru_unref_by(resource_user, static_cast<gpr_atm>(size));
    909 }
    910 
    911 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
    912                                        bool destructive,
    913                                        grpc_closure* closure) {
    914   GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
    915   resource_user->new_reclaimers[destructive] = closure;
    916   GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
    917                      GRPC_ERROR_NONE);
    918 }
    919 
    920 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
    921   if (grpc_resource_quota_trace.enabled()) {
    922     gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
    923             resource_user->resource_quota->name, resource_user->name);
    924   }
    925   GRPC_CLOSURE_SCHED(
    926       &resource_user->resource_quota->rq_reclamation_done_closure,
    927       GRPC_ERROR_NONE);
    928 }
    929 
    930 void grpc_resource_user_slice_allocator_init(
    931     grpc_resource_user_slice_allocator* slice_allocator,
    932     grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
    933   GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
    934                     slice_allocator, grpc_schedule_on_exec_ctx);
    935   GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
    936                     grpc_schedule_on_exec_ctx);
    937   slice_allocator->resource_user = resource_user;
    938 }
    939 
    940 void grpc_resource_user_alloc_slices(
    941     grpc_resource_user_slice_allocator* slice_allocator, size_t length,
    942     size_t count, grpc_slice_buffer* dest) {
    943   slice_allocator->length = length;
    944   slice_allocator->count = count;
    945   slice_allocator->dest = dest;
    946   grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
    947                            &slice_allocator->on_allocated);
    948 }
    949