1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/gpr/mpscq.h" 22 23 #include <grpc/support/log.h> 24 25 void gpr_mpscq_init(gpr_mpscq* q) { 26 gpr_atm_no_barrier_store(&q->head, (gpr_atm)&q->stub); 27 q->tail = &q->stub; 28 gpr_atm_no_barrier_store(&q->stub.next, (gpr_atm)NULL); 29 } 30 31 void gpr_mpscq_destroy(gpr_mpscq* q) { 32 GPR_ASSERT(gpr_atm_no_barrier_load(&q->head) == (gpr_atm)&q->stub); 33 GPR_ASSERT(q->tail == &q->stub); 34 } 35 36 bool gpr_mpscq_push(gpr_mpscq* q, gpr_mpscq_node* n) { 37 gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); 38 gpr_mpscq_node* prev = 39 (gpr_mpscq_node*)gpr_atm_full_xchg(&q->head, (gpr_atm)n); 40 gpr_atm_rel_store(&prev->next, (gpr_atm)n); 41 return prev == &q->stub; 42 } 43 44 gpr_mpscq_node* gpr_mpscq_pop(gpr_mpscq* q) { 45 bool empty; 46 return gpr_mpscq_pop_and_check_end(q, &empty); 47 } 48 49 gpr_mpscq_node* gpr_mpscq_pop_and_check_end(gpr_mpscq* q, bool* empty) { 50 gpr_mpscq_node* tail = q->tail; 51 gpr_mpscq_node* next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next); 52 if (tail == &q->stub) { 53 // indicates the list is actually (ephemerally) empty 54 if (next == nullptr) { 55 *empty = true; 56 return nullptr; 57 } 58 q->tail = next; 59 tail = next; 60 next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next); 61 } 62 if (next != nullptr) { 63 *empty = false; 64 q->tail = next; 65 return tail; 66 } 67 gpr_mpscq_node* head = (gpr_mpscq_node*)gpr_atm_acq_load(&q->head); 68 if (tail != head) { 69 *empty = false; 70 // indicates a retry is in order: we're still adding 71 return nullptr; 72 } 73 gpr_mpscq_push(q, &q->stub); 74 next = (gpr_mpscq_node*)gpr_atm_acq_load(&tail->next); 75 if (next != nullptr) { 76 *empty = false; 77 q->tail = next; 78 return tail; 79 } 80 // indicates a retry is in order: we're still adding 81 *empty = false; 82 return nullptr; 83 } 84 85 void gpr_locked_mpscq_init(gpr_locked_mpscq* q) { 86 gpr_mpscq_init(&q->queue); 87 gpr_mu_init(&q->mu); 88 } 89 90 void gpr_locked_mpscq_destroy(gpr_locked_mpscq* q) { 91 gpr_mpscq_destroy(&q->queue); 92 gpr_mu_destroy(&q->mu); 93 } 94 95 bool gpr_locked_mpscq_push(gpr_locked_mpscq* q, gpr_mpscq_node* n) { 96 return gpr_mpscq_push(&q->queue, n); 97 } 98 99 gpr_mpscq_node* gpr_locked_mpscq_try_pop(gpr_locked_mpscq* q) { 100 if (gpr_mu_trylock(&q->mu)) { 101 gpr_mpscq_node* n = gpr_mpscq_pop(&q->queue); 102 gpr_mu_unlock(&q->mu); 103 return n; 104 } 105 return nullptr; 106 } 107 108 gpr_mpscq_node* gpr_locked_mpscq_pop(gpr_locked_mpscq* q) { 109 gpr_mu_lock(&q->mu); 110 bool empty = false; 111 gpr_mpscq_node* n; 112 do { 113 n = gpr_mpscq_pop_and_check_end(&q->queue, &empty); 114 } while (n == nullptr && !empty); 115 gpr_mu_unlock(&q->mu); 116 return n; 117 } 118