Home | History | Annotate | Download | only in src
      1 /******************************************************************************
      2  *
      3  *  Copyright (C) 2014 Google, Inc.
      4  *
      5  *  Licensed under the Apache License, Version 2.0 (the "License");
      6  *  you may not use this file except in compliance with the License.
      7  *  You may obtain a copy of the License at:
      8  *
      9  *  http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  ******************************************************************************/
     18 
     19 #define LOG_TAG "bt_osi_eager_reader"
     20 
     21 #include "osi/include/eager_reader.h"
     22 
     23 #include <assert.h>
     24 #include <errno.h>
     25 #include <string.h>
     26 #include <sys/eventfd.h>
     27 #include <unistd.h>
     28 
     29 #include "osi/include/fixed_queue.h"
     30 #include "osi/include/log.h"
     31 #include "osi/include/osi.h"
     32 #include "osi/include/reactor.h"
     33 
     34 #if !defined(EFD_SEMAPHORE)
     35 #  define EFD_SEMAPHORE (1 << 0)
     36 #endif
     37 
     38 typedef struct {
     39   size_t length;
     40   size_t offset;
     41   uint8_t data[];
     42 } data_buffer_t;
     43 
     44 struct eager_reader_t {
     45   int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes
     46   int inbound_fd;
     47 
     48   const allocator_t *allocator;
     49   size_t buffer_size;
     50   fixed_queue_t *buffers;
     51   data_buffer_t *current_buffer;
     52 
     53   thread_t *inbound_read_thread;
     54   reactor_object_t *inbound_read_object;
     55 
     56   reactor_object_t *outbound_registration;
     57   eager_reader_cb outbound_read_ready;
     58   void *outbound_context;
     59 };
     60 
     61 static bool has_byte(const eager_reader_t *reader);
     62 static void inbound_data_waiting(void *context);
     63 static void internal_outbound_read_ready(void *context);
     64 
     65 eager_reader_t *eager_reader_new(
     66     int fd_to_read,
     67     const allocator_t *allocator,
     68     size_t buffer_size,
     69     size_t max_buffer_count,
     70     const char *thread_name) {
     71 
     72   assert(fd_to_read != INVALID_FD);
     73   assert(allocator != NULL);
     74   assert(buffer_size > 0);
     75   assert(max_buffer_count > 0);
     76   assert(thread_name != NULL && *thread_name != '\0');
     77 
     78   eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t));
     79 
     80   ret->allocator = allocator;
     81   ret->inbound_fd = fd_to_read;
     82 
     83   ret->bytes_available_fd = eventfd(0, 0);
     84   if (ret->bytes_available_fd == INVALID_FD) {
     85     LOG_ERROR(LOG_TAG, "%s unable to create output reading semaphore.", __func__);
     86     goto error;
     87   }
     88 
     89   ret->buffer_size = buffer_size;
     90 
     91   ret->buffers = fixed_queue_new(max_buffer_count);
     92   if (!ret->buffers) {
     93     LOG_ERROR(LOG_TAG, "%s unable to create buffers queue.", __func__);
     94     goto error;
     95   }
     96 
     97   ret->inbound_read_thread = thread_new(thread_name);
     98   if (!ret->inbound_read_thread) {
     99     LOG_ERROR(LOG_TAG, "%s unable to make reading thread.", __func__);
    100     goto error;
    101   }
    102 
    103   ret->inbound_read_object = reactor_register(
    104     thread_get_reactor(ret->inbound_read_thread),
    105     fd_to_read,
    106     ret,
    107     inbound_data_waiting,
    108     NULL
    109   );
    110 
    111   return ret;
    112 
    113 error:;
    114   eager_reader_free(ret);
    115   return NULL;
    116 }
    117 
    118 void eager_reader_free(eager_reader_t *reader) {
    119   if (!reader)
    120     return;
    121 
    122   eager_reader_unregister(reader);
    123 
    124   // Only unregister from the input if we actually did register
    125   if (reader->inbound_read_object)
    126     reactor_unregister(reader->inbound_read_object);
    127 
    128   if (reader->bytes_available_fd != INVALID_FD)
    129     close(reader->bytes_available_fd);
    130 
    131   // Free the current buffer, because it's not in the queue
    132   // and won't be freed below
    133   if (reader->current_buffer)
    134     reader->allocator->free(reader->current_buffer);
    135 
    136   fixed_queue_free(reader->buffers, reader->allocator->free);
    137   thread_free(reader->inbound_read_thread);
    138   osi_free(reader);
    139 }
    140 
    141 void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {
    142   assert(reader != NULL);
    143   assert(reactor != NULL);
    144   assert(read_cb != NULL);
    145 
    146   // Make sure the reader isn't currently registered.
    147   eager_reader_unregister(reader);
    148 
    149   reader->outbound_read_ready = read_cb;
    150   reader->outbound_context = context;
    151   reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
    152 }
    153 
    154 void eager_reader_unregister(eager_reader_t *reader) {
    155   assert(reader != NULL);
    156 
    157   if (reader->outbound_registration) {
    158     reactor_unregister(reader->outbound_registration);
    159     reader->outbound_registration = NULL;
    160   }
    161 }
    162 
    163 // SEE HEADER FOR THREAD SAFETY NOTE
    164 size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size) {
    165   assert(reader != NULL);
    166   assert(buffer != NULL);
    167 
    168   // Poll to see if we have any bytes available before reading.
    169   if (!has_byte(reader))
    170     return 0;
    171 
    172   // Find out how many bytes we have available in our various buffers.
    173   eventfd_t bytes_available;
    174   if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {
    175     LOG_ERROR(LOG_TAG, "%s unable to read semaphore for output data.", __func__);
    176     return 0;
    177   }
    178 
    179   if (max_size > bytes_available)
    180     max_size = bytes_available;
    181 
    182   size_t bytes_consumed = 0;
    183   while (bytes_consumed < max_size) {
    184     if (!reader->current_buffer)
    185       reader->current_buffer = fixed_queue_dequeue(reader->buffers);
    186 
    187     size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;
    188     if (bytes_to_copy > (max_size - bytes_consumed))
    189       bytes_to_copy = max_size - bytes_consumed;
    190 
    191     memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);
    192     bytes_consumed += bytes_to_copy;
    193     reader->current_buffer->offset += bytes_to_copy;
    194 
    195     if (reader->current_buffer->offset >= reader->current_buffer->length) {
    196       reader->allocator->free(reader->current_buffer);
    197       reader->current_buffer = NULL;
    198     }
    199   }
    200 
    201   bytes_available -= bytes_consumed;
    202   if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {
    203     LOG_ERROR(LOG_TAG, "%s unable to write back bytes available for output data.", __func__);
    204   }
    205 
    206   return bytes_consumed;
    207 }
    208 
    209 thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) {
    210   assert(reader != NULL);
    211   return reader->inbound_read_thread;
    212 }
    213 
    214 static bool has_byte(const eager_reader_t *reader) {
    215   assert(reader != NULL);
    216 
    217   fd_set read_fds;
    218 
    219   for (;;) {
    220     FD_ZERO(&read_fds);
    221     FD_SET(reader->bytes_available_fd, &read_fds);
    222 
    223     // Immediate timeout
    224     struct timeval timeout;
    225     timeout.tv_sec = 0;
    226     timeout.tv_usec = 0;
    227 
    228     int ret = select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL,
    229                      &timeout);
    230     if (ret == -1 && errno == EINTR)
    231       continue;
    232     break;
    233   }
    234 
    235   return FD_ISSET(reader->bytes_available_fd, &read_fds);
    236 }
    237 
    238 static void inbound_data_waiting(void *context) {
    239   eager_reader_t *reader = (eager_reader_t *)context;
    240 
    241   data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t));
    242   if (!buffer) {
    243     LOG_ERROR(LOG_TAG, "%s couldn't aquire memory for inbound data buffer.", __func__);
    244     return;
    245   }
    246 
    247   buffer->length = 0;
    248   buffer->offset = 0;
    249 
    250   ssize_t bytes_read;
    251   OSI_NO_INTR(bytes_read = read(reader->inbound_fd, buffer->data,
    252                                 reader->buffer_size));
    253   if (bytes_read > 0) {
    254     // Save the data for later
    255     buffer->length = bytes_read;
    256     fixed_queue_enqueue(reader->buffers, buffer);
    257 
    258     // Tell consumers data is available by incrementing
    259     // the semaphore by the number of bytes we just read
    260     eventfd_write(reader->bytes_available_fd, bytes_read);
    261   } else {
    262     if (bytes_read == 0)
    263       LOG_WARN(LOG_TAG, "%s fd said bytes existed, but none were found.", __func__);
    264     else
    265       LOG_WARN(LOG_TAG, "%s unable to read from file descriptor: %s", __func__, strerror(errno));
    266 
    267     reader->allocator->free(buffer);
    268   }
    269 }
    270 
    271 static void internal_outbound_read_ready(void *context) {
    272   assert(context != NULL);
    273 
    274   eager_reader_t *reader = (eager_reader_t *)context;
    275   reader->outbound_read_ready(reader, reader->outbound_context);
    276 }
    277