1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #include <assert.h> 20 #include <pthread.h> 21 #include <string.h> 22 23 #include "osi/include/allocator.h" 24 #include "osi/include/fixed_queue.h" 25 #include "osi/include/list.h" 26 #include "osi/include/osi.h" 27 #include "osi/include/semaphore.h" 28 #include "osi/include/reactor.h" 29 30 typedef struct fixed_queue_t { 31 list_t *list; 32 semaphore_t *enqueue_sem; 33 semaphore_t *dequeue_sem; 34 pthread_mutex_t lock; 35 size_t capacity; 36 37 reactor_object_t *dequeue_object; 38 fixed_queue_cb dequeue_ready; 39 void *dequeue_context; 40 } fixed_queue_t; 41 42 static void internal_dequeue_ready(void *context); 43 44 fixed_queue_t *fixed_queue_new(size_t capacity) { 45 fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t)); 46 47 pthread_mutex_init(&ret->lock, NULL); 48 ret->capacity = capacity; 49 50 ret->list = list_new(NULL); 51 if (!ret->list) 52 goto error; 53 54 ret->enqueue_sem = semaphore_new(capacity); 55 if (!ret->enqueue_sem) 56 goto error; 57 58 ret->dequeue_sem = semaphore_new(0); 59 if (!ret->dequeue_sem) 60 goto error; 61 62 return ret; 63 64 error: 65 fixed_queue_free(ret, NULL); 66 return NULL; 67 } 68 69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) { 70 if (!queue) 71 return; 72 73 fixed_queue_unregister_dequeue(queue); 74 75 if (free_cb) 76 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node)) 77 free_cb(list_node(node)); 78 79 list_free(queue->list); 80 semaphore_free(queue->enqueue_sem); 81 semaphore_free(queue->dequeue_sem); 82 pthread_mutex_destroy(&queue->lock); 83 osi_free(queue); 84 } 85 86 bool fixed_queue_is_empty(fixed_queue_t *queue) { 87 if (queue == NULL) 88 return true; 89 90 pthread_mutex_lock(&queue->lock); 91 bool is_empty = list_is_empty(queue->list); 92 pthread_mutex_unlock(&queue->lock); 93 94 return is_empty; 95 } 96 97 size_t fixed_queue_length(fixed_queue_t *queue) { 98 if (queue == NULL) 99 return 0; 100 101 pthread_mutex_lock(&queue->lock); 102 size_t length = list_length(queue->list); 103 pthread_mutex_unlock(&queue->lock); 104 105 return length; 106 } 107 108 size_t fixed_queue_capacity(fixed_queue_t *queue) { 109 assert(queue != NULL); 110 111 return queue->capacity; 112 } 113 114 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) { 115 assert(queue != NULL); 116 assert(data != NULL); 117 118 semaphore_wait(queue->enqueue_sem); 119 120 pthread_mutex_lock(&queue->lock); 121 list_append(queue->list, data); 122 pthread_mutex_unlock(&queue->lock); 123 124 semaphore_post(queue->dequeue_sem); 125 } 126 127 void *fixed_queue_dequeue(fixed_queue_t *queue) { 128 assert(queue != NULL); 129 130 semaphore_wait(queue->dequeue_sem); 131 132 pthread_mutex_lock(&queue->lock); 133 void *ret = list_front(queue->list); 134 list_remove(queue->list, ret); 135 pthread_mutex_unlock(&queue->lock); 136 137 semaphore_post(queue->enqueue_sem); 138 139 return ret; 140 } 141 142 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) { 143 assert(queue != NULL); 144 assert(data != NULL); 145 146 if (!semaphore_try_wait(queue->enqueue_sem)) 147 return false; 148 149 pthread_mutex_lock(&queue->lock); 150 list_append(queue->list, data); 151 pthread_mutex_unlock(&queue->lock); 152 153 semaphore_post(queue->dequeue_sem); 154 return true; 155 } 156 157 void *fixed_queue_try_dequeue(fixed_queue_t *queue) { 158 if (queue == NULL) 159 return NULL; 160 161 if (!semaphore_try_wait(queue->dequeue_sem)) 162 return NULL; 163 164 pthread_mutex_lock(&queue->lock); 165 void *ret = list_front(queue->list); 166 list_remove(queue->list, ret); 167 pthread_mutex_unlock(&queue->lock); 168 169 semaphore_post(queue->enqueue_sem); 170 171 return ret; 172 } 173 174 void *fixed_queue_try_peek_first(fixed_queue_t *queue) { 175 if (queue == NULL) 176 return NULL; 177 178 pthread_mutex_lock(&queue->lock); 179 void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list); 180 pthread_mutex_unlock(&queue->lock); 181 182 return ret; 183 } 184 185 void *fixed_queue_try_peek_last(fixed_queue_t *queue) { 186 if (queue == NULL) 187 return NULL; 188 189 pthread_mutex_lock(&queue->lock); 190 void *ret = list_is_empty(queue->list) ? NULL : list_back(queue->list); 191 pthread_mutex_unlock(&queue->lock); 192 193 return ret; 194 } 195 196 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data) { 197 if (queue == NULL) 198 return NULL; 199 200 bool removed = false; 201 pthread_mutex_lock(&queue->lock); 202 if (list_contains(queue->list, data) && 203 semaphore_try_wait(queue->dequeue_sem)) { 204 removed = list_remove(queue->list, data); 205 assert(removed); 206 } 207 pthread_mutex_unlock(&queue->lock); 208 209 if (removed) { 210 semaphore_post(queue->enqueue_sem); 211 return data; 212 } 213 return NULL; 214 } 215 216 list_t *fixed_queue_get_list(fixed_queue_t *queue) { 217 assert(queue != NULL); 218 219 // NOTE: This function is not thread safe, and there is no point for 220 // calling pthread_mutex_lock() / pthread_mutex_unlock() 221 return queue->list; 222 } 223 224 225 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) { 226 assert(queue != NULL); 227 return semaphore_get_fd(queue->dequeue_sem); 228 } 229 230 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) { 231 assert(queue != NULL); 232 return semaphore_get_fd(queue->enqueue_sem); 233 } 234 235 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) { 236 assert(queue != NULL); 237 assert(reactor != NULL); 238 assert(ready_cb != NULL); 239 240 // Make sure we're not already registered 241 fixed_queue_unregister_dequeue(queue); 242 243 queue->dequeue_ready = ready_cb; 244 queue->dequeue_context = context; 245 queue->dequeue_object = reactor_register( 246 reactor, 247 fixed_queue_get_dequeue_fd(queue), 248 queue, 249 internal_dequeue_ready, 250 NULL 251 ); 252 } 253 254 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) { 255 assert(queue != NULL); 256 257 if (queue->dequeue_object) { 258 reactor_unregister(queue->dequeue_object); 259 queue->dequeue_object = NULL; 260 } 261 } 262 263 static void internal_dequeue_ready(void *context) { 264 assert(context != NULL); 265 266 fixed_queue_t *queue = context; 267 queue->dequeue_ready(queue, queue->dequeue_context); 268 } 269