1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #define LOG_TAG "bt_osi_reactor" 20 21 #include "osi/include/reactor.h" 22 23 #include <base/logging.h> 24 #include <errno.h> 25 #include <pthread.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <sys/epoll.h> 29 #include <sys/eventfd.h> 30 #include <unistd.h> 31 32 #include <mutex> 33 34 #include "osi/include/allocator.h" 35 #include "osi/include/list.h" 36 #include "osi/include/log.h" 37 38 #if !defined(EFD_SEMAPHORE) 39 #define EFD_SEMAPHORE (1 << 0) 40 #endif 41 42 struct reactor_t { 43 int epoll_fd; 44 int event_fd; 45 std::mutex* list_mutex; 46 list_t* invalidation_list; // reactor objects that have been unregistered. 47 pthread_t run_thread; // the pthread on which reactor_run is executing. 48 bool is_running; // indicates whether |run_thread| is valid. 49 bool object_removed; 50 }; 51 52 struct reactor_object_t { 53 int fd; // the file descriptor to monitor for events. 54 void* context; // a context that's passed back to the *_ready functions. 55 reactor_t* reactor; // the reactor instance this object is registered with. 56 std::mutex* mutex; // protects the lifetime of this object and all variables. 57 58 void (*read_ready)(void* context); // function to call when the file 59 // descriptor becomes readable. 60 void (*write_ready)(void* context); // function to call when the file 61 // descriptor becomes writeable. 62 }; 63 64 static reactor_status_t run_reactor(reactor_t* reactor, int iterations); 65 66 static const size_t MAX_EVENTS = 64; 67 static const eventfd_t EVENT_REACTOR_STOP = 1; 68 69 reactor_t* reactor_new(void) { 70 reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t)); 71 72 ret->epoll_fd = INVALID_FD; 73 ret->event_fd = INVALID_FD; 74 75 ret->epoll_fd = epoll_create(MAX_EVENTS); 76 if (ret->epoll_fd == INVALID_FD) { 77 LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__, 78 strerror(errno)); 79 goto error; 80 } 81 82 ret->event_fd = eventfd(0, 0); 83 if (ret->event_fd == INVALID_FD) { 84 LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__, 85 strerror(errno)); 86 goto error; 87 } 88 89 ret->list_mutex = new std::mutex; 90 ret->invalidation_list = list_new(NULL); 91 if (!ret->invalidation_list) { 92 LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.", 93 __func__); 94 goto error; 95 } 96 97 struct epoll_event event; 98 memset(&event, 0, sizeof(event)); 99 event.events = EPOLLIN; 100 event.data.ptr = NULL; 101 if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) { 102 LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s", 103 __func__, strerror(errno)); 104 goto error; 105 } 106 107 return ret; 108 109 error:; 110 reactor_free(ret); 111 return NULL; 112 } 113 114 void reactor_free(reactor_t* reactor) { 115 if (!reactor) return; 116 117 list_free(reactor->invalidation_list); 118 close(reactor->event_fd); 119 close(reactor->epoll_fd); 120 osi_free(reactor); 121 } 122 123 reactor_status_t reactor_start(reactor_t* reactor) { 124 CHECK(reactor != NULL); 125 return run_reactor(reactor, 0); 126 } 127 128 reactor_status_t reactor_run_once(reactor_t* reactor) { 129 CHECK(reactor != NULL); 130 return run_reactor(reactor, 1); 131 } 132 133 void reactor_stop(reactor_t* reactor) { 134 CHECK(reactor != NULL); 135 136 eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP); 137 } 138 139 reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context, 140 void (*read_ready)(void* context), 141 void (*write_ready)(void* context)) { 142 CHECK(reactor != NULL); 143 CHECK(fd != INVALID_FD); 144 145 reactor_object_t* object = 146 (reactor_object_t*)osi_calloc(sizeof(reactor_object_t)); 147 148 object->reactor = reactor; 149 object->fd = fd; 150 object->context = context; 151 object->read_ready = read_ready; 152 object->write_ready = write_ready; 153 object->mutex = new std::mutex; 154 155 struct epoll_event event; 156 memset(&event, 0, sizeof(event)); 157 if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP); 158 if (write_ready) event.events |= EPOLLOUT; 159 event.data.ptr = object; 160 161 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { 162 LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__, 163 fd, strerror(errno)); 164 delete object->mutex; 165 osi_free(object); 166 return NULL; 167 } 168 169 return object; 170 } 171 172 bool reactor_change_registration(reactor_object_t* object, 173 void (*read_ready)(void* context), 174 void (*write_ready)(void* context)) { 175 CHECK(object != NULL); 176 177 struct epoll_event event; 178 memset(&event, 0, sizeof(event)); 179 if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP); 180 if (write_ready) event.events |= EPOLLOUT; 181 event.data.ptr = object; 182 183 if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == 184 -1) { 185 LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s", 186 __func__, object->fd, strerror(errno)); 187 return false; 188 } 189 190 std::lock_guard<std::mutex> lock(*object->mutex); 191 object->read_ready = read_ready; 192 object->write_ready = write_ready; 193 194 return true; 195 } 196 197 void reactor_unregister(reactor_object_t* obj) { 198 CHECK(obj != NULL); 199 200 reactor_t* reactor = obj->reactor; 201 202 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1) 203 LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s", 204 __func__, obj->fd, strerror(errno)); 205 206 if (reactor->is_running && 207 pthread_equal(pthread_self(), reactor->run_thread)) { 208 reactor->object_removed = true; 209 return; 210 } 211 212 { 213 std::unique_lock<std::mutex> lock(*reactor->list_mutex); 214 list_append(reactor->invalidation_list, obj); 215 } 216 217 // Taking the object lock here makes sure a callback for |obj| isn't 218 // currently executing. The reactor thread must then either be before 219 // the callbacks or after. If after, we know that the object won't be 220 // referenced because it has been taken out of the epoll set. If before, 221 // it won't be referenced because the reactor thread will check the 222 // invalidation_list and find it in there. So by taking this lock, we 223 // are waiting until the reactor thread drops all references to |obj|. 224 // One the wait completes, we can unlock and destroy |obj| safely. 225 obj->mutex->lock(); 226 obj->mutex->unlock(); 227 delete obj->mutex; 228 osi_free(obj); 229 } 230 231 // Runs the reactor loop for a maximum of |iterations|. 232 // 0 |iterations| means loop forever. 233 // |reactor| may not be NULL. 234 static reactor_status_t run_reactor(reactor_t* reactor, int iterations) { 235 CHECK(reactor != NULL); 236 237 reactor->run_thread = pthread_self(); 238 reactor->is_running = true; 239 240 struct epoll_event events[MAX_EVENTS]; 241 for (int i = 0; iterations == 0 || i < iterations; ++i) { 242 { 243 std::lock_guard<std::mutex> lock(*reactor->list_mutex); 244 list_clear(reactor->invalidation_list); 245 } 246 247 int ret; 248 OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1)); 249 if (ret == -1) { 250 LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__, 251 strerror(errno)); 252 reactor->is_running = false; 253 return REACTOR_STATUS_ERROR; 254 } 255 256 for (int j = 0; j < ret; ++j) { 257 // The event file descriptor is the only one that registers with 258 // a NULL data pointer. We use the NULL to identify it and break 259 // out of the reactor loop. 260 if (events[j].data.ptr == NULL) { 261 eventfd_t value; 262 eventfd_read(reactor->event_fd, &value); 263 reactor->is_running = false; 264 return REACTOR_STATUS_STOP; 265 } 266 267 reactor_object_t* object = (reactor_object_t*)events[j].data.ptr; 268 269 std::unique_lock<std::mutex> lock(*reactor->list_mutex); 270 if (list_contains(reactor->invalidation_list, object)) { 271 continue; 272 } 273 274 // Downgrade the list lock to an object lock. 275 { 276 std::lock_guard<std::mutex> obj_lock(*object->mutex); 277 lock.unlock(); 278 279 reactor->object_removed = false; 280 if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && 281 object->read_ready) 282 object->read_ready(object->context); 283 if (!reactor->object_removed && events[j].events & EPOLLOUT && 284 object->write_ready) 285 object->write_ready(object->context); 286 } 287 288 if (reactor->object_removed) { 289 delete object->mutex; 290 osi_free(object); 291 } 292 } 293 } 294 295 reactor->is_running = false; 296 return REACTOR_STATUS_DONE; 297 } 298