1 /****************************************************************************** 2 * 3 * Copyright 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #include <base/logging.h> 20 #include <string.h> 21 22 #include <mutex> 23 24 #include "osi/include/allocator.h" 25 #include "osi/include/fixed_queue.h" 26 #include "osi/include/list.h" 27 #include "osi/include/osi.h" 28 #include "osi/include/reactor.h" 29 #include "osi/include/semaphore.h" 30 31 typedef struct fixed_queue_t { 32 list_t* list; 33 semaphore_t* enqueue_sem; 34 semaphore_t* dequeue_sem; 35 std::mutex* mutex; 36 size_t capacity; 37 38 reactor_object_t* dequeue_object; 39 fixed_queue_cb dequeue_ready; 40 void* dequeue_context; 41 } fixed_queue_t; 42 43 static void internal_dequeue_ready(void* context); 44 45 fixed_queue_t* fixed_queue_new(size_t capacity) { 46 fixed_queue_t* ret = 47 static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t))); 48 49 ret->mutex = new std::mutex; 50 ret->capacity = capacity; 51 52 ret->list = list_new(NULL); 53 if (!ret->list) goto error; 54 55 ret->enqueue_sem = semaphore_new(capacity); 56 if (!ret->enqueue_sem) goto error; 57 58 ret->dequeue_sem = semaphore_new(0); 59 if (!ret->dequeue_sem) goto error; 60 61 return ret; 62 63 error: 64 fixed_queue_free(ret, NULL); 65 return NULL; 66 } 67 68 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) { 69 if (!queue) return; 70 71 fixed_queue_unregister_dequeue(queue); 72 73 if (free_cb) 74 for (const list_node_t* node = list_begin(queue->list); 75 node != list_end(queue->list); node = list_next(node)) 76 free_cb(list_node(node)); 77 78 list_free(queue->list); 79 semaphore_free(queue->enqueue_sem); 80 semaphore_free(queue->dequeue_sem); 81 delete queue->mutex; 82 osi_free(queue); 83 } 84 85 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) { 86 if (!queue) return; 87 88 while (!fixed_queue_is_empty(queue)) { 89 void* data = fixed_queue_try_dequeue(queue); 90 if (free_cb != NULL) { 91 free_cb(data); 92 } 93 } 94 } 95 96 bool fixed_queue_is_empty(fixed_queue_t* queue) { 97 if (queue == NULL) return true; 98 99 std::lock_guard<std::mutex> lock(*queue->mutex); 100 return list_is_empty(queue->list); 101 } 102 103 size_t fixed_queue_length(fixed_queue_t* queue) { 104 if (queue == NULL) return 0; 105 106 std::lock_guard<std::mutex> lock(*queue->mutex); 107 return list_length(queue->list); 108 } 109 110 size_t fixed_queue_capacity(fixed_queue_t* queue) { 111 CHECK(queue != NULL); 112 113 return queue->capacity; 114 } 115 116 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) { 117 CHECK(queue != NULL); 118 CHECK(data != NULL); 119 120 semaphore_wait(queue->enqueue_sem); 121 122 { 123 std::lock_guard<std::mutex> lock(*queue->mutex); 124 list_append(queue->list, data); 125 } 126 127 semaphore_post(queue->dequeue_sem); 128 } 129 130 void* fixed_queue_dequeue(fixed_queue_t* queue) { 131 CHECK(queue != NULL); 132 133 semaphore_wait(queue->dequeue_sem); 134 135 void* ret = NULL; 136 { 137 std::lock_guard<std::mutex> lock(*queue->mutex); 138 ret = list_front(queue->list); 139 list_remove(queue->list, ret); 140 } 141 142 semaphore_post(queue->enqueue_sem); 143 144 return ret; 145 } 146 147 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) { 148 CHECK(queue != NULL); 149 CHECK(data != NULL); 150 151 if (!semaphore_try_wait(queue->enqueue_sem)) return false; 152 153 { 154 std::lock_guard<std::mutex> lock(*queue->mutex); 155 list_append(queue->list, data); 156 } 157 158 semaphore_post(queue->dequeue_sem); 159 return true; 160 } 161 162 void* fixed_queue_try_dequeue(fixed_queue_t* queue) { 163 if (queue == NULL) return NULL; 164 165 if (!semaphore_try_wait(queue->dequeue_sem)) return NULL; 166 167 void* ret = NULL; 168 { 169 std::lock_guard<std::mutex> lock(*queue->mutex); 170 ret = list_front(queue->list); 171 list_remove(queue->list, ret); 172 } 173 174 semaphore_post(queue->enqueue_sem); 175 176 return ret; 177 } 178 179 void* fixed_queue_try_peek_first(fixed_queue_t* queue) { 180 if (queue == NULL) return NULL; 181 182 std::lock_guard<std::mutex> lock(*queue->mutex); 183 return list_is_empty(queue->list) ? NULL : list_front(queue->list); 184 } 185 186 void* fixed_queue_try_peek_last(fixed_queue_t* queue) { 187 if (queue == NULL) return NULL; 188 189 std::lock_guard<std::mutex> lock(*queue->mutex); 190 return list_is_empty(queue->list) ? NULL : list_back(queue->list); 191 } 192 193 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) { 194 if (queue == NULL) return NULL; 195 196 bool removed = false; 197 { 198 std::lock_guard<std::mutex> lock(*queue->mutex); 199 if (list_contains(queue->list, data) && 200 semaphore_try_wait(queue->dequeue_sem)) { 201 removed = list_remove(queue->list, data); 202 CHECK(removed); 203 } 204 } 205 206 if (removed) { 207 semaphore_post(queue->enqueue_sem); 208 return data; 209 } 210 return NULL; 211 } 212 213 list_t* fixed_queue_get_list(fixed_queue_t* queue) { 214 CHECK(queue != NULL); 215 216 // NOTE: Using the list in this way is not thread-safe. 217 // Using this list in any context where threads can call other functions 218 // to the queue can break our assumptions and the queue in general. 219 return queue->list; 220 } 221 222 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) { 223 CHECK(queue != NULL); 224 return semaphore_get_fd(queue->dequeue_sem); 225 } 226 227 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) { 228 CHECK(queue != NULL); 229 return semaphore_get_fd(queue->enqueue_sem); 230 } 231 232 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor, 233 fixed_queue_cb ready_cb, void* context) { 234 CHECK(queue != NULL); 235 CHECK(reactor != NULL); 236 CHECK(ready_cb != NULL); 237 238 // Make sure we're not already registered 239 fixed_queue_unregister_dequeue(queue); 240 241 queue->dequeue_ready = ready_cb; 242 queue->dequeue_context = context; 243 queue->dequeue_object = 244 reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue, 245 internal_dequeue_ready, NULL); 246 } 247 248 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) { 249 CHECK(queue != NULL); 250 251 if (queue->dequeue_object) { 252 reactor_unregister(queue->dequeue_object); 253 queue->dequeue_object = NULL; 254 } 255 } 256 257 static void internal_dequeue_ready(void* context) { 258 CHECK(context != NULL); 259 260 fixed_queue_t* queue = static_cast<fixed_queue_t*>(context); 261 queue->dequeue_ready(queue, queue->dequeue_context); 262 } 263