Home | History | Annotate | Download | only in src
      1 /******************************************************************************
      2  *
      3  *  Copyright 2014 Google, Inc.
      4  *
      5  *  Licensed under the Apache License, Version 2.0 (the "License");
      6  *  you may not use this file except in compliance with the License.
      7  *  You may obtain a copy of the License at:
      8  *
      9  *  http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  ******************************************************************************/
     18 
     19 #include <base/logging.h>
     20 #include <string.h>
     21 
     22 #include <mutex>
     23 
     24 #include "osi/include/allocator.h"
     25 #include "osi/include/fixed_queue.h"
     26 #include "osi/include/list.h"
     27 #include "osi/include/osi.h"
     28 #include "osi/include/reactor.h"
     29 #include "osi/include/semaphore.h"
     30 
     31 typedef struct fixed_queue_t {
     32   list_t* list;
     33   semaphore_t* enqueue_sem;
     34   semaphore_t* dequeue_sem;
     35   std::mutex* mutex;
     36   size_t capacity;
     37 
     38   reactor_object_t* dequeue_object;
     39   fixed_queue_cb dequeue_ready;
     40   void* dequeue_context;
     41 } fixed_queue_t;
     42 
     43 static void internal_dequeue_ready(void* context);
     44 
     45 fixed_queue_t* fixed_queue_new(size_t capacity) {
     46   fixed_queue_t* ret =
     47       static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
     48 
     49   ret->mutex = new std::mutex;
     50   ret->capacity = capacity;
     51 
     52   ret->list = list_new(NULL);
     53   if (!ret->list) goto error;
     54 
     55   ret->enqueue_sem = semaphore_new(capacity);
     56   if (!ret->enqueue_sem) goto error;
     57 
     58   ret->dequeue_sem = semaphore_new(0);
     59   if (!ret->dequeue_sem) goto error;
     60 
     61   return ret;
     62 
     63 error:
     64   fixed_queue_free(ret, NULL);
     65   return NULL;
     66 }
     67 
     68 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
     69   if (!queue) return;
     70 
     71   fixed_queue_unregister_dequeue(queue);
     72 
     73   if (free_cb)
     74     for (const list_node_t* node = list_begin(queue->list);
     75          node != list_end(queue->list); node = list_next(node))
     76       free_cb(list_node(node));
     77 
     78   list_free(queue->list);
     79   semaphore_free(queue->enqueue_sem);
     80   semaphore_free(queue->dequeue_sem);
     81   delete queue->mutex;
     82   osi_free(queue);
     83 }
     84 
     85 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
     86   if (!queue) return;
     87 
     88   while (!fixed_queue_is_empty(queue)) {
     89     void* data = fixed_queue_try_dequeue(queue);
     90     if (free_cb != NULL) {
     91       free_cb(data);
     92     }
     93   }
     94 }
     95 
     96 bool fixed_queue_is_empty(fixed_queue_t* queue) {
     97   if (queue == NULL) return true;
     98 
     99   std::lock_guard<std::mutex> lock(*queue->mutex);
    100   return list_is_empty(queue->list);
    101 }
    102 
    103 size_t fixed_queue_length(fixed_queue_t* queue) {
    104   if (queue == NULL) return 0;
    105 
    106   std::lock_guard<std::mutex> lock(*queue->mutex);
    107   return list_length(queue->list);
    108 }
    109 
    110 size_t fixed_queue_capacity(fixed_queue_t* queue) {
    111   CHECK(queue != NULL);
    112 
    113   return queue->capacity;
    114 }
    115 
    116 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
    117   CHECK(queue != NULL);
    118   CHECK(data != NULL);
    119 
    120   semaphore_wait(queue->enqueue_sem);
    121 
    122   {
    123     std::lock_guard<std::mutex> lock(*queue->mutex);
    124     list_append(queue->list, data);
    125   }
    126 
    127   semaphore_post(queue->dequeue_sem);
    128 }
    129 
    130 void* fixed_queue_dequeue(fixed_queue_t* queue) {
    131   CHECK(queue != NULL);
    132 
    133   semaphore_wait(queue->dequeue_sem);
    134 
    135   void* ret = NULL;
    136   {
    137     std::lock_guard<std::mutex> lock(*queue->mutex);
    138     ret = list_front(queue->list);
    139     list_remove(queue->list, ret);
    140   }
    141 
    142   semaphore_post(queue->enqueue_sem);
    143 
    144   return ret;
    145 }
    146 
    147 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
    148   CHECK(queue != NULL);
    149   CHECK(data != NULL);
    150 
    151   if (!semaphore_try_wait(queue->enqueue_sem)) return false;
    152 
    153   {
    154     std::lock_guard<std::mutex> lock(*queue->mutex);
    155     list_append(queue->list, data);
    156   }
    157 
    158   semaphore_post(queue->dequeue_sem);
    159   return true;
    160 }
    161 
    162 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
    163   if (queue == NULL) return NULL;
    164 
    165   if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
    166 
    167   void* ret = NULL;
    168   {
    169     std::lock_guard<std::mutex> lock(*queue->mutex);
    170     ret = list_front(queue->list);
    171     list_remove(queue->list, ret);
    172   }
    173 
    174   semaphore_post(queue->enqueue_sem);
    175 
    176   return ret;
    177 }
    178 
    179 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
    180   if (queue == NULL) return NULL;
    181 
    182   std::lock_guard<std::mutex> lock(*queue->mutex);
    183   return list_is_empty(queue->list) ? NULL : list_front(queue->list);
    184 }
    185 
    186 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
    187   if (queue == NULL) return NULL;
    188 
    189   std::lock_guard<std::mutex> lock(*queue->mutex);
    190   return list_is_empty(queue->list) ? NULL : list_back(queue->list);
    191 }
    192 
    193 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
    194   if (queue == NULL) return NULL;
    195 
    196   bool removed = false;
    197   {
    198     std::lock_guard<std::mutex> lock(*queue->mutex);
    199     if (list_contains(queue->list, data) &&
    200         semaphore_try_wait(queue->dequeue_sem)) {
    201       removed = list_remove(queue->list, data);
    202       CHECK(removed);
    203     }
    204   }
    205 
    206   if (removed) {
    207     semaphore_post(queue->enqueue_sem);
    208     return data;
    209   }
    210   return NULL;
    211 }
    212 
    213 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
    214   CHECK(queue != NULL);
    215 
    216   // NOTE: Using the list in this way is not thread-safe.
    217   // Using this list in any context where threads can call other functions
    218   // to the queue can break our assumptions and the queue in general.
    219   return queue->list;
    220 }
    221 
    222 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
    223   CHECK(queue != NULL);
    224   return semaphore_get_fd(queue->dequeue_sem);
    225 }
    226 
    227 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
    228   CHECK(queue != NULL);
    229   return semaphore_get_fd(queue->enqueue_sem);
    230 }
    231 
    232 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
    233                                   fixed_queue_cb ready_cb, void* context) {
    234   CHECK(queue != NULL);
    235   CHECK(reactor != NULL);
    236   CHECK(ready_cb != NULL);
    237 
    238   // Make sure we're not already registered
    239   fixed_queue_unregister_dequeue(queue);
    240 
    241   queue->dequeue_ready = ready_cb;
    242   queue->dequeue_context = context;
    243   queue->dequeue_object =
    244       reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
    245                        internal_dequeue_ready, NULL);
    246 }
    247 
    248 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
    249   CHECK(queue != NULL);
    250 
    251   if (queue->dequeue_object) {
    252     reactor_unregister(queue->dequeue_object);
    253     queue->dequeue_object = NULL;
    254   }
    255 }
    256 
    257 static void internal_dequeue_ready(void* context) {
    258   CHECK(context != NULL);
    259 
    260   fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
    261   queue->dequeue_ready(queue, queue->dequeue_context);
    262 }
    263