1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #define LOG_TAG "bt_osi_eager_reader" 20 21 #include "osi/include/eager_reader.h" 22 23 #include <assert.h> 24 #include <errno.h> 25 #include <string.h> 26 #include <sys/eventfd.h> 27 #include <unistd.h> 28 29 #include "osi/include/fixed_queue.h" 30 #include "osi/include/log.h" 31 #include "osi/include/osi.h" 32 #include "osi/include/reactor.h" 33 34 #if !defined(EFD_SEMAPHORE) 35 # define EFD_SEMAPHORE (1 << 0) 36 #endif 37 38 typedef struct { 39 size_t length; 40 size_t offset; 41 uint8_t data[]; 42 } data_buffer_t; 43 44 struct eager_reader_t { 45 int bytes_available_fd; // semaphore mode eventfd which counts the number of available bytes 46 int inbound_fd; 47 48 const allocator_t *allocator; 49 size_t buffer_size; 50 fixed_queue_t *buffers; 51 data_buffer_t *current_buffer; 52 53 thread_t *inbound_read_thread; 54 reactor_object_t *inbound_read_object; 55 56 reactor_object_t *outbound_registration; 57 eager_reader_cb outbound_read_ready; 58 void *outbound_context; 59 }; 60 61 static bool has_byte(const eager_reader_t *reader); 62 static void inbound_data_waiting(void *context); 63 static void internal_outbound_read_ready(void *context); 64 65 eager_reader_t *eager_reader_new( 66 int fd_to_read, 67 const allocator_t *allocator, 68 size_t buffer_size, 69 size_t max_buffer_count, 70 const char *thread_name) { 71 72 assert(fd_to_read != INVALID_FD); 73 assert(allocator != NULL); 74 assert(buffer_size > 0); 75 assert(max_buffer_count > 0); 76 assert(thread_name != NULL && *thread_name != '\0'); 77 78 eager_reader_t *ret = osi_calloc(sizeof(eager_reader_t)); 79 80 ret->allocator = allocator; 81 ret->inbound_fd = fd_to_read; 82 83 ret->bytes_available_fd = eventfd(0, 0); 84 if (ret->bytes_available_fd == INVALID_FD) { 85 LOG_ERROR(LOG_TAG, "%s unable to create output reading semaphore.", __func__); 86 goto error; 87 } 88 89 ret->buffer_size = buffer_size; 90 91 ret->buffers = fixed_queue_new(max_buffer_count); 92 if (!ret->buffers) { 93 LOG_ERROR(LOG_TAG, "%s unable to create buffers queue.", __func__); 94 goto error; 95 } 96 97 ret->inbound_read_thread = thread_new(thread_name); 98 if (!ret->inbound_read_thread) { 99 LOG_ERROR(LOG_TAG, "%s unable to make reading thread.", __func__); 100 goto error; 101 } 102 103 ret->inbound_read_object = reactor_register( 104 thread_get_reactor(ret->inbound_read_thread), 105 fd_to_read, 106 ret, 107 inbound_data_waiting, 108 NULL 109 ); 110 111 return ret; 112 113 error:; 114 eager_reader_free(ret); 115 return NULL; 116 } 117 118 void eager_reader_free(eager_reader_t *reader) { 119 if (!reader) 120 return; 121 122 eager_reader_unregister(reader); 123 124 // Only unregister from the input if we actually did register 125 if (reader->inbound_read_object) 126 reactor_unregister(reader->inbound_read_object); 127 128 if (reader->bytes_available_fd != INVALID_FD) 129 close(reader->bytes_available_fd); 130 131 // Free the current buffer, because it's not in the queue 132 // and won't be freed below 133 if (reader->current_buffer) 134 reader->allocator->free(reader->current_buffer); 135 136 fixed_queue_free(reader->buffers, reader->allocator->free); 137 thread_free(reader->inbound_read_thread); 138 osi_free(reader); 139 } 140 141 void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) { 142 assert(reader != NULL); 143 assert(reactor != NULL); 144 assert(read_cb != NULL); 145 146 // Make sure the reader isn't currently registered. 147 eager_reader_unregister(reader); 148 149 reader->outbound_read_ready = read_cb; 150 reader->outbound_context = context; 151 reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL); 152 } 153 154 void eager_reader_unregister(eager_reader_t *reader) { 155 assert(reader != NULL); 156 157 if (reader->outbound_registration) { 158 reactor_unregister(reader->outbound_registration); 159 reader->outbound_registration = NULL; 160 } 161 } 162 163 // SEE HEADER FOR THREAD SAFETY NOTE 164 size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size) { 165 assert(reader != NULL); 166 assert(buffer != NULL); 167 168 // Poll to see if we have any bytes available before reading. 169 if (!has_byte(reader)) 170 return 0; 171 172 // Find out how many bytes we have available in our various buffers. 173 eventfd_t bytes_available; 174 if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) { 175 LOG_ERROR(LOG_TAG, "%s unable to read semaphore for output data.", __func__); 176 return 0; 177 } 178 179 if (max_size > bytes_available) 180 max_size = bytes_available; 181 182 size_t bytes_consumed = 0; 183 while (bytes_consumed < max_size) { 184 if (!reader->current_buffer) 185 reader->current_buffer = fixed_queue_dequeue(reader->buffers); 186 187 size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset; 188 if (bytes_to_copy > (max_size - bytes_consumed)) 189 bytes_to_copy = max_size - bytes_consumed; 190 191 memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy); 192 bytes_consumed += bytes_to_copy; 193 reader->current_buffer->offset += bytes_to_copy; 194 195 if (reader->current_buffer->offset >= reader->current_buffer->length) { 196 reader->allocator->free(reader->current_buffer); 197 reader->current_buffer = NULL; 198 } 199 } 200 201 bytes_available -= bytes_consumed; 202 if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) { 203 LOG_ERROR(LOG_TAG, "%s unable to write back bytes available for output data.", __func__); 204 } 205 206 return bytes_consumed; 207 } 208 209 thread_t* eager_reader_get_read_thread(const eager_reader_t *reader) { 210 assert(reader != NULL); 211 return reader->inbound_read_thread; 212 } 213 214 static bool has_byte(const eager_reader_t *reader) { 215 assert(reader != NULL); 216 217 fd_set read_fds; 218 219 for (;;) { 220 FD_ZERO(&read_fds); 221 FD_SET(reader->bytes_available_fd, &read_fds); 222 223 // Immediate timeout 224 struct timeval timeout; 225 timeout.tv_sec = 0; 226 timeout.tv_usec = 0; 227 228 int ret = select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, 229 &timeout); 230 if (ret == -1 && errno == EINTR) 231 continue; 232 break; 233 } 234 235 return FD_ISSET(reader->bytes_available_fd, &read_fds); 236 } 237 238 static void inbound_data_waiting(void *context) { 239 eager_reader_t *reader = (eager_reader_t *)context; 240 241 data_buffer_t *buffer = (data_buffer_t *)reader->allocator->alloc(reader->buffer_size + sizeof(data_buffer_t)); 242 if (!buffer) { 243 LOG_ERROR(LOG_TAG, "%s couldn't aquire memory for inbound data buffer.", __func__); 244 return; 245 } 246 247 buffer->length = 0; 248 buffer->offset = 0; 249 250 ssize_t bytes_read; 251 OSI_NO_INTR(bytes_read = read(reader->inbound_fd, buffer->data, 252 reader->buffer_size)); 253 if (bytes_read > 0) { 254 // Save the data for later 255 buffer->length = bytes_read; 256 fixed_queue_enqueue(reader->buffers, buffer); 257 258 // Tell consumers data is available by incrementing 259 // the semaphore by the number of bytes we just read 260 eventfd_write(reader->bytes_available_fd, bytes_read); 261 } else { 262 if (bytes_read == 0) 263 LOG_WARN(LOG_TAG, "%s fd said bytes existed, but none were found.", __func__); 264 else 265 LOG_WARN(LOG_TAG, "%s unable to read from file descriptor: %s", __func__, strerror(errno)); 266 267 reader->allocator->free(buffer); 268 } 269 } 270 271 static void internal_outbound_read_ready(void *context) { 272 assert(context != NULL); 273 274 eager_reader_t *reader = (eager_reader_t *)context; 275 reader->outbound_read_ready(reader, reader->outbound_context); 276 } 277