Home | History | Annotate | Download | only in src
      1 /******************************************************************************
      2  *
      3  *  Copyright (C) 2014 Google, Inc.
      4  *
      5  *  Licensed under the Apache License, Version 2.0 (the "License");
      6  *  you may not use this file except in compliance with the License.
      7  *  You may obtain a copy of the License at:
      8  *
      9  *  http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  ******************************************************************************/
     18 
     19 #include <assert.h>
     20 #include <pthread.h>
     21 #include <string.h>
     22 
     23 #include "osi/include/allocator.h"
     24 #include "osi/include/fixed_queue.h"
     25 #include "osi/include/list.h"
     26 #include "osi/include/osi.h"
     27 #include "osi/include/semaphore.h"
     28 #include "osi/include/reactor.h"
     29 
     30 typedef struct fixed_queue_t {
     31   list_t *list;
     32   semaphore_t *enqueue_sem;
     33   semaphore_t *dequeue_sem;
     34   pthread_mutex_t lock;
     35   size_t capacity;
     36 
     37   reactor_object_t *dequeue_object;
     38   fixed_queue_cb dequeue_ready;
     39   void *dequeue_context;
     40 } fixed_queue_t;
     41 
     42 static void internal_dequeue_ready(void *context);
     43 
     44 fixed_queue_t *fixed_queue_new(size_t capacity) {
     45   fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
     46 
     47   pthread_mutex_init(&ret->lock, NULL);
     48   ret->capacity = capacity;
     49 
     50   ret->list = list_new(NULL);
     51   if (!ret->list)
     52     goto error;
     53 
     54   ret->enqueue_sem = semaphore_new(capacity);
     55   if (!ret->enqueue_sem)
     56     goto error;
     57 
     58   ret->dequeue_sem = semaphore_new(0);
     59   if (!ret->dequeue_sem)
     60     goto error;
     61 
     62   return ret;
     63 
     64 error:
     65   fixed_queue_free(ret, NULL);
     66   return NULL;
     67 }
     68 
     69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
     70   if (!queue)
     71     return;
     72 
     73   fixed_queue_unregister_dequeue(queue);
     74 
     75   if (free_cb)
     76     for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
     77       free_cb(list_node(node));
     78 
     79   list_free(queue->list);
     80   semaphore_free(queue->enqueue_sem);
     81   semaphore_free(queue->dequeue_sem);
     82   pthread_mutex_destroy(&queue->lock);
     83   osi_free(queue);
     84 }
     85 
     86 bool fixed_queue_is_empty(fixed_queue_t *queue) {
     87   if (queue == NULL)
     88     return true;
     89 
     90   pthread_mutex_lock(&queue->lock);
     91   bool is_empty = list_is_empty(queue->list);
     92   pthread_mutex_unlock(&queue->lock);
     93 
     94   return is_empty;
     95 }
     96 
     97 size_t fixed_queue_length(fixed_queue_t *queue) {
     98   if (queue == NULL)
     99     return 0;
    100 
    101   pthread_mutex_lock(&queue->lock);
    102   size_t length = list_length(queue->list);
    103   pthread_mutex_unlock(&queue->lock);
    104 
    105   return length;
    106 }
    107 
    108 size_t fixed_queue_capacity(fixed_queue_t *queue) {
    109   assert(queue != NULL);
    110 
    111   return queue->capacity;
    112 }
    113 
    114 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
    115   assert(queue != NULL);
    116   assert(data != NULL);
    117 
    118   semaphore_wait(queue->enqueue_sem);
    119 
    120   pthread_mutex_lock(&queue->lock);
    121   list_append(queue->list, data);
    122   pthread_mutex_unlock(&queue->lock);
    123 
    124   semaphore_post(queue->dequeue_sem);
    125 }
    126 
    127 void *fixed_queue_dequeue(fixed_queue_t *queue) {
    128   assert(queue != NULL);
    129 
    130   semaphore_wait(queue->dequeue_sem);
    131 
    132   pthread_mutex_lock(&queue->lock);
    133   void *ret = list_front(queue->list);
    134   list_remove(queue->list, ret);
    135   pthread_mutex_unlock(&queue->lock);
    136 
    137   semaphore_post(queue->enqueue_sem);
    138 
    139   return ret;
    140 }
    141 
    142 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
    143   assert(queue != NULL);
    144   assert(data != NULL);
    145 
    146   if (!semaphore_try_wait(queue->enqueue_sem))
    147     return false;
    148 
    149   pthread_mutex_lock(&queue->lock);
    150   list_append(queue->list, data);
    151   pthread_mutex_unlock(&queue->lock);
    152 
    153   semaphore_post(queue->dequeue_sem);
    154   return true;
    155 }
    156 
    157 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
    158   if (queue == NULL)
    159     return NULL;
    160 
    161   if (!semaphore_try_wait(queue->dequeue_sem))
    162     return NULL;
    163 
    164   pthread_mutex_lock(&queue->lock);
    165   void *ret = list_front(queue->list);
    166   list_remove(queue->list, ret);
    167   pthread_mutex_unlock(&queue->lock);
    168 
    169   semaphore_post(queue->enqueue_sem);
    170 
    171   return ret;
    172 }
    173 
    174 void *fixed_queue_try_peek_first(fixed_queue_t *queue) {
    175   if (queue == NULL)
    176     return NULL;
    177 
    178   pthread_mutex_lock(&queue->lock);
    179   void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
    180   pthread_mutex_unlock(&queue->lock);
    181 
    182   return ret;
    183 }
    184 
    185 void *fixed_queue_try_peek_last(fixed_queue_t *queue) {
    186   if (queue == NULL)
    187     return NULL;
    188 
    189   pthread_mutex_lock(&queue->lock);
    190   void *ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
    191   pthread_mutex_unlock(&queue->lock);
    192 
    193   return ret;
    194 }
    195 
    196 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data) {
    197   if (queue == NULL)
    198     return NULL;
    199 
    200   bool removed = false;
    201   pthread_mutex_lock(&queue->lock);
    202   if (list_contains(queue->list, data) &&
    203       semaphore_try_wait(queue->dequeue_sem)) {
    204     removed = list_remove(queue->list, data);
    205     assert(removed);
    206   }
    207   pthread_mutex_unlock(&queue->lock);
    208 
    209   if (removed) {
    210     semaphore_post(queue->enqueue_sem);
    211     return data;
    212   }
    213   return NULL;
    214 }
    215 
    216 list_t *fixed_queue_get_list(fixed_queue_t *queue) {
    217   assert(queue != NULL);
    218 
    219   // NOTE: This function is not thread safe, and there is no point for
    220   // calling pthread_mutex_lock() / pthread_mutex_unlock()
    221   return queue->list;
    222 }
    223 
    224 
    225 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
    226   assert(queue != NULL);
    227   return semaphore_get_fd(queue->dequeue_sem);
    228 }
    229 
    230 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
    231   assert(queue != NULL);
    232   return semaphore_get_fd(queue->enqueue_sem);
    233 }
    234 
    235 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
    236   assert(queue != NULL);
    237   assert(reactor != NULL);
    238   assert(ready_cb != NULL);
    239 
    240   // Make sure we're not already registered
    241   fixed_queue_unregister_dequeue(queue);
    242 
    243   queue->dequeue_ready = ready_cb;
    244   queue->dequeue_context = context;
    245   queue->dequeue_object = reactor_register(
    246     reactor,
    247     fixed_queue_get_dequeue_fd(queue),
    248     queue,
    249     internal_dequeue_ready,
    250     NULL
    251   );
    252 }
    253 
    254 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
    255   assert(queue != NULL);
    256 
    257   if (queue->dequeue_object) {
    258     reactor_unregister(queue->dequeue_object);
    259     queue->dequeue_object = NULL;
    260   }
    261 }
    262 
    263 static void internal_dequeue_ready(void *context) {
    264   assert(context != NULL);
    265 
    266   fixed_queue_t *queue = context;
    267   queue->dequeue_ready(queue, queue->dequeue_context);
    268 }
    269