1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #define LOG_TAG "bt_osi_reactor" 20 21 #include "osi/include/reactor.h" 22 23 #include <assert.h> 24 #include <errno.h> 25 #include <pthread.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <sys/epoll.h> 29 #include <sys/eventfd.h> 30 #include <unistd.h> 31 32 #include "osi/include/allocator.h" 33 #include "osi/include/list.h" 34 #include "osi/include/log.h" 35 36 #if !defined(EFD_SEMAPHORE) 37 # define EFD_SEMAPHORE (1 << 0) 38 #endif 39 40 struct reactor_t { 41 int epoll_fd; 42 int event_fd; 43 pthread_mutex_t list_lock; // protects invalidation_list. 44 list_t *invalidation_list; // reactor objects that have been unregistered. 45 pthread_t run_thread; // the pthread on which reactor_run is executing. 46 bool is_running; // indicates whether |run_thread| is valid. 47 bool object_removed; 48 }; 49 50 struct reactor_object_t { 51 int fd; // the file descriptor to monitor for events. 52 void *context; // a context that's passed back to the *_ready functions. 53 reactor_t *reactor; // the reactor instance this object is registered with. 54 pthread_mutex_t lock; // protects the lifetime of this object and all variables. 55 56 void (*read_ready)(void *context); // function to call when the file descriptor becomes readable. 57 void (*write_ready)(void *context); // function to call when the file descriptor becomes writeable. 58 }; 59 60 static reactor_status_t run_reactor(reactor_t *reactor, int iterations); 61 62 static const size_t MAX_EVENTS = 64; 63 static const eventfd_t EVENT_REACTOR_STOP = 1; 64 65 reactor_t *reactor_new(void) { 66 reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t)); 67 68 ret->epoll_fd = INVALID_FD; 69 ret->event_fd = INVALID_FD; 70 71 ret->epoll_fd = epoll_create(MAX_EVENTS); 72 if (ret->epoll_fd == INVALID_FD) { 73 LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__, strerror(errno)); 74 goto error; 75 } 76 77 ret->event_fd = eventfd(0, 0); 78 if (ret->event_fd == INVALID_FD) { 79 LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__, strerror(errno)); 80 goto error; 81 } 82 83 pthread_mutex_init(&ret->list_lock, NULL); 84 ret->invalidation_list = list_new(NULL); 85 if (!ret->invalidation_list) { 86 LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.", __func__); 87 goto error; 88 } 89 90 struct epoll_event event; 91 memset(&event, 0, sizeof(event)); 92 event.events = EPOLLIN; 93 event.data.ptr = NULL; 94 if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) { 95 LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s", __func__, strerror(errno)); 96 goto error; 97 } 98 99 return ret; 100 101 error:; 102 reactor_free(ret); 103 return NULL; 104 } 105 106 void reactor_free(reactor_t *reactor) { 107 if (!reactor) 108 return; 109 110 list_free(reactor->invalidation_list); 111 close(reactor->event_fd); 112 close(reactor->epoll_fd); 113 osi_free(reactor); 114 } 115 116 reactor_status_t reactor_start(reactor_t *reactor) { 117 assert(reactor != NULL); 118 return run_reactor(reactor, 0); 119 } 120 121 reactor_status_t reactor_run_once(reactor_t *reactor) { 122 assert(reactor != NULL); 123 return run_reactor(reactor, 1); 124 } 125 126 void reactor_stop(reactor_t *reactor) { 127 assert(reactor != NULL); 128 129 eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP); 130 } 131 132 reactor_object_t *reactor_register(reactor_t *reactor, 133 int fd, void *context, 134 void (*read_ready)(void *context), 135 void (*write_ready)(void *context)) { 136 assert(reactor != NULL); 137 assert(fd != INVALID_FD); 138 139 reactor_object_t *object = 140 (reactor_object_t *)osi_calloc(sizeof(reactor_object_t)); 141 142 object->reactor = reactor; 143 object->fd = fd; 144 object->context = context; 145 object->read_ready = read_ready; 146 object->write_ready = write_ready; 147 pthread_mutex_init(&object->lock, NULL); 148 149 struct epoll_event event; 150 memset(&event, 0, sizeof(event)); 151 if (read_ready) 152 event.events |= (EPOLLIN | EPOLLRDHUP); 153 if (write_ready) 154 event.events |= EPOLLOUT; 155 event.data.ptr = object; 156 157 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { 158 LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno)); 159 pthread_mutex_destroy(&object->lock); 160 osi_free(object); 161 return NULL; 162 } 163 164 return object; 165 } 166 167 bool reactor_change_registration(reactor_object_t *object, 168 void (*read_ready)(void *context), 169 void (*write_ready)(void *context)) { 170 assert(object != NULL); 171 172 struct epoll_event event; 173 memset(&event, 0, sizeof(event)); 174 if (read_ready) 175 event.events |= (EPOLLIN | EPOLLRDHUP); 176 if (write_ready) 177 event.events |= EPOLLOUT; 178 event.data.ptr = object; 179 180 if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) { 181 LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno)); 182 return false; 183 } 184 185 pthread_mutex_lock(&object->lock); 186 object->read_ready = read_ready; 187 object->write_ready = write_ready; 188 pthread_mutex_unlock(&object->lock); 189 190 return true; 191 } 192 193 void reactor_unregister(reactor_object_t *obj) { 194 assert(obj != NULL); 195 196 reactor_t *reactor = obj->reactor; 197 198 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1) 199 LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno)); 200 201 if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) { 202 reactor->object_removed = true; 203 return; 204 } 205 206 pthread_mutex_lock(&reactor->list_lock); 207 list_append(reactor->invalidation_list, obj); 208 pthread_mutex_unlock(&reactor->list_lock); 209 210 // Taking the object lock here makes sure a callback for |obj| isn't 211 // currently executing. The reactor thread must then either be before 212 // the callbacks or after. If after, we know that the object won't be 213 // referenced because it has been taken out of the epoll set. If before, 214 // it won't be referenced because the reactor thread will check the 215 // invalidation_list and find it in there. So by taking this lock, we 216 // are waiting until the reactor thread drops all references to |obj|. 217 // One the wait completes, we can unlock and destroy |obj| safely. 218 pthread_mutex_lock(&obj->lock); 219 pthread_mutex_unlock(&obj->lock); 220 pthread_mutex_destroy(&obj->lock); 221 osi_free(obj); 222 } 223 224 // Runs the reactor loop for a maximum of |iterations|. 225 // 0 |iterations| means loop forever. 226 // |reactor| may not be NULL. 227 static reactor_status_t run_reactor(reactor_t *reactor, int iterations) { 228 assert(reactor != NULL); 229 230 reactor->run_thread = pthread_self(); 231 reactor->is_running = true; 232 233 struct epoll_event events[MAX_EVENTS]; 234 for (int i = 0; iterations == 0 || i < iterations; ++i) { 235 pthread_mutex_lock(&reactor->list_lock); 236 list_clear(reactor->invalidation_list); 237 pthread_mutex_unlock(&reactor->list_lock); 238 239 int ret; 240 OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1)); 241 if (ret == -1) { 242 LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__, strerror(errno)); 243 reactor->is_running = false; 244 return REACTOR_STATUS_ERROR; 245 } 246 247 for (int j = 0; j < ret; ++j) { 248 // The event file descriptor is the only one that registers with 249 // a NULL data pointer. We use the NULL to identify it and break 250 // out of the reactor loop. 251 if (events[j].data.ptr == NULL) { 252 eventfd_t value; 253 eventfd_read(reactor->event_fd, &value); 254 reactor->is_running = false; 255 return REACTOR_STATUS_STOP; 256 } 257 258 reactor_object_t *object = (reactor_object_t *)events[j].data.ptr; 259 260 pthread_mutex_lock(&reactor->list_lock); 261 if (list_contains(reactor->invalidation_list, object)) { 262 pthread_mutex_unlock(&reactor->list_lock); 263 continue; 264 } 265 266 // Downgrade the list lock to an object lock. 267 pthread_mutex_lock(&object->lock); 268 pthread_mutex_unlock(&reactor->list_lock); 269 270 reactor->object_removed = false; 271 if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready) 272 object->read_ready(object->context); 273 if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready) 274 object->write_ready(object->context); 275 pthread_mutex_unlock(&object->lock); 276 277 if (reactor->object_removed) { 278 pthread_mutex_destroy(&object->lock); 279 osi_free(object); 280 } 281 } 282 } 283 284 reactor->is_running = false; 285 return REACTOR_STATUS_DONE; 286 } 287