1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #include <assert.h> 20 #include <pthread.h> 21 #include <stdlib.h> 22 23 #include "fixed_queue.h" 24 #include "list.h" 25 #include "osi.h" 26 #include "semaphore.h" 27 28 typedef struct fixed_queue_t { 29 list_t *list; 30 semaphore_t *enqueue_sem; 31 semaphore_t *dequeue_sem; 32 pthread_mutex_t lock; 33 size_t capacity; 34 } fixed_queue_t; 35 36 fixed_queue_t *fixed_queue_new(size_t capacity) { 37 fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t)); 38 if (!ret) 39 goto error; 40 41 ret->list = list_new(NULL); 42 if (!ret->list) 43 goto error; 44 45 ret->enqueue_sem = semaphore_new(capacity); 46 if (!ret->enqueue_sem) 47 goto error; 48 49 ret->dequeue_sem = semaphore_new(0); 50 if (!ret->dequeue_sem) 51 goto error; 52 53 pthread_mutex_init(&ret->lock, NULL); 54 ret->capacity = capacity; 55 56 return ret; 57 58 error:; 59 if (ret) { 60 list_free(ret->list); 61 semaphore_free(ret->enqueue_sem); 62 semaphore_free(ret->dequeue_sem); 63 } 64 65 free(ret); 66 return NULL; 67 } 68 69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) { 70 if (!queue) 71 return; 72 73 if (free_cb) 74 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node)) 75 free_cb(list_node(node)); 76 77 list_free(queue->list); 78 semaphore_free(queue->enqueue_sem); 79 semaphore_free(queue->dequeue_sem); 80 pthread_mutex_destroy(&queue->lock); 81 free(queue); 82 } 83 84 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) { 85 assert(queue != NULL); 86 assert(data != NULL); 87 88 semaphore_wait(queue->enqueue_sem); 89 90 pthread_mutex_lock(&queue->lock); 91 list_append(queue->list, data); 92 pthread_mutex_unlock(&queue->lock); 93 94 semaphore_post(queue->dequeue_sem); 95 } 96 97 void *fixed_queue_dequeue(fixed_queue_t *queue) { 98 assert(queue != NULL); 99 100 semaphore_wait(queue->dequeue_sem); 101 102 pthread_mutex_lock(&queue->lock); 103 void *ret = list_front(queue->list); 104 list_remove(queue->list, ret); 105 pthread_mutex_unlock(&queue->lock); 106 107 semaphore_post(queue->enqueue_sem); 108 109 return ret; 110 } 111 112 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) { 113 assert(queue != NULL); 114 assert(data != NULL); 115 116 if (!semaphore_try_wait(queue->enqueue_sem)) 117 return false; 118 119 pthread_mutex_lock(&queue->lock); 120 list_append(queue->list, data); 121 pthread_mutex_unlock(&queue->lock); 122 123 semaphore_post(queue->dequeue_sem); 124 return true; 125 } 126 127 void *fixed_queue_try_dequeue(fixed_queue_t *queue) { 128 assert(queue != NULL); 129 130 if (!semaphore_try_wait(queue->dequeue_sem)) 131 return NULL; 132 133 pthread_mutex_lock(&queue->lock); 134 void *ret = list_front(queue->list); 135 list_remove(queue->list, ret); 136 pthread_mutex_unlock(&queue->lock); 137 138 semaphore_post(queue->enqueue_sem); 139 140 return ret; 141 } 142 143 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) { 144 assert(queue != NULL); 145 return semaphore_get_fd(queue->dequeue_sem); 146 } 147 148 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) { 149 assert(queue != NULL); 150 return semaphore_get_fd(queue->enqueue_sem); 151 } 152