Home | History | Annotate | Download | only in src
      1 /******************************************************************************
      2  *
      3  *  Copyright 2014 Google, Inc.
      4  *
      5  *  Licensed under the Apache License, Version 2.0 (the "License");
      6  *  you may not use this file except in compliance with the License.
      7  *  You may obtain a copy of the License at:
      8  *
      9  *  http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  ******************************************************************************/
     18 
     19 #define LOG_TAG "bt_osi_reactor"
     20 
     21 #include "osi/include/reactor.h"
     22 
     23 #include <base/logging.h>
     24 #include <errno.h>
     25 #include <pthread.h>
     26 #include <stdlib.h>
     27 #include <string.h>
     28 #include <sys/epoll.h>
     29 #include <sys/eventfd.h>
     30 #include <unistd.h>
     31 
     32 #include <mutex>
     33 
     34 #include "osi/include/allocator.h"
     35 #include "osi/include/list.h"
     36 #include "osi/include/log.h"
     37 
     38 #if !defined(EFD_SEMAPHORE)
     39 #define EFD_SEMAPHORE (1 << 0)
     40 #endif
     41 
     42 struct reactor_t {
     43   int epoll_fd;
     44   int event_fd;
     45   std::mutex* list_mutex;
     46   list_t* invalidation_list;  // reactor objects that have been unregistered.
     47   pthread_t run_thread;       // the pthread on which reactor_run is executing.
     48   bool is_running;            // indicates whether |run_thread| is valid.
     49   bool object_removed;
     50 };
     51 
     52 struct reactor_object_t {
     53   int fd;              // the file descriptor to monitor for events.
     54   void* context;       // a context that's passed back to the *_ready functions.
     55   reactor_t* reactor;  // the reactor instance this object is registered with.
     56   std::mutex* mutex;  // protects the lifetime of this object and all variables.
     57 
     58   void (*read_ready)(void* context);   // function to call when the file
     59                                        // descriptor becomes readable.
     60   void (*write_ready)(void* context);  // function to call when the file
     61                                        // descriptor becomes writeable.
     62 };
     63 
     64 static reactor_status_t run_reactor(reactor_t* reactor, int iterations);
     65 
     66 static const size_t MAX_EVENTS = 64;
     67 static const eventfd_t EVENT_REACTOR_STOP = 1;
     68 
     69 reactor_t* reactor_new(void) {
     70   reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t));
     71 
     72   ret->epoll_fd = INVALID_FD;
     73   ret->event_fd = INVALID_FD;
     74 
     75   ret->epoll_fd = epoll_create(MAX_EVENTS);
     76   if (ret->epoll_fd == INVALID_FD) {
     77     LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__,
     78               strerror(errno));
     79     goto error;
     80   }
     81 
     82   ret->event_fd = eventfd(0, 0);
     83   if (ret->event_fd == INVALID_FD) {
     84     LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__,
     85               strerror(errno));
     86     goto error;
     87   }
     88 
     89   ret->list_mutex = new std::mutex;
     90   ret->invalidation_list = list_new(NULL);
     91   if (!ret->invalidation_list) {
     92     LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.",
     93               __func__);
     94     goto error;
     95   }
     96 
     97   struct epoll_event event;
     98   memset(&event, 0, sizeof(event));
     99   event.events = EPOLLIN;
    100   event.data.ptr = NULL;
    101   if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
    102     LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s",
    103               __func__, strerror(errno));
    104     goto error;
    105   }
    106 
    107   return ret;
    108 
    109 error:;
    110   reactor_free(ret);
    111   return NULL;
    112 }
    113 
    114 void reactor_free(reactor_t* reactor) {
    115   if (!reactor) return;
    116 
    117   list_free(reactor->invalidation_list);
    118   close(reactor->event_fd);
    119   close(reactor->epoll_fd);
    120   osi_free(reactor);
    121 }
    122 
    123 reactor_status_t reactor_start(reactor_t* reactor) {
    124   CHECK(reactor != NULL);
    125   return run_reactor(reactor, 0);
    126 }
    127 
    128 reactor_status_t reactor_run_once(reactor_t* reactor) {
    129   CHECK(reactor != NULL);
    130   return run_reactor(reactor, 1);
    131 }
    132 
    133 void reactor_stop(reactor_t* reactor) {
    134   CHECK(reactor != NULL);
    135 
    136   eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
    137 }
    138 
    139 reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context,
    140                                    void (*read_ready)(void* context),
    141                                    void (*write_ready)(void* context)) {
    142   CHECK(reactor != NULL);
    143   CHECK(fd != INVALID_FD);
    144 
    145   reactor_object_t* object =
    146       (reactor_object_t*)osi_calloc(sizeof(reactor_object_t));
    147 
    148   object->reactor = reactor;
    149   object->fd = fd;
    150   object->context = context;
    151   object->read_ready = read_ready;
    152   object->write_ready = write_ready;
    153   object->mutex = new std::mutex;
    154 
    155   struct epoll_event event;
    156   memset(&event, 0, sizeof(event));
    157   if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
    158   if (write_ready) event.events |= EPOLLOUT;
    159   event.data.ptr = object;
    160 
    161   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
    162     LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__,
    163               fd, strerror(errno));
    164     delete object->mutex;
    165     osi_free(object);
    166     return NULL;
    167   }
    168 
    169   return object;
    170 }
    171 
    172 bool reactor_change_registration(reactor_object_t* object,
    173                                  void (*read_ready)(void* context),
    174                                  void (*write_ready)(void* context)) {
    175   CHECK(object != NULL);
    176 
    177   struct epoll_event event;
    178   memset(&event, 0, sizeof(event));
    179   if (read_ready) event.events |= (EPOLLIN | EPOLLRDHUP);
    180   if (write_ready) event.events |= EPOLLOUT;
    181   event.data.ptr = object;
    182 
    183   if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) ==
    184       -1) {
    185     LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s",
    186               __func__, object->fd, strerror(errno));
    187     return false;
    188   }
    189 
    190   std::lock_guard<std::mutex> lock(*object->mutex);
    191   object->read_ready = read_ready;
    192   object->write_ready = write_ready;
    193 
    194   return true;
    195 }
    196 
    197 void reactor_unregister(reactor_object_t* obj) {
    198   CHECK(obj != NULL);
    199 
    200   reactor_t* reactor = obj->reactor;
    201 
    202   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
    203     LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s",
    204               __func__, obj->fd, strerror(errno));
    205 
    206   if (reactor->is_running &&
    207       pthread_equal(pthread_self(), reactor->run_thread)) {
    208     reactor->object_removed = true;
    209     return;
    210   }
    211 
    212   {
    213     std::unique_lock<std::mutex> lock(*reactor->list_mutex);
    214     list_append(reactor->invalidation_list, obj);
    215   }
    216 
    217   // Taking the object lock here makes sure a callback for |obj| isn't
    218   // currently executing. The reactor thread must then either be before
    219   // the callbacks or after. If after, we know that the object won't be
    220   // referenced because it has been taken out of the epoll set. If before,
    221   // it won't be referenced because the reactor thread will check the
    222   // invalidation_list and find it in there. So by taking this lock, we
    223   // are waiting until the reactor thread drops all references to |obj|.
    224   // One the wait completes, we can unlock and destroy |obj| safely.
    225   obj->mutex->lock();
    226   obj->mutex->unlock();
    227   delete obj->mutex;
    228   osi_free(obj);
    229 }
    230 
    231 // Runs the reactor loop for a maximum of |iterations|.
    232 // 0 |iterations| means loop forever.
    233 // |reactor| may not be NULL.
    234 static reactor_status_t run_reactor(reactor_t* reactor, int iterations) {
    235   CHECK(reactor != NULL);
    236 
    237   reactor->run_thread = pthread_self();
    238   reactor->is_running = true;
    239 
    240   struct epoll_event events[MAX_EVENTS];
    241   for (int i = 0; iterations == 0 || i < iterations; ++i) {
    242     {
    243       std::lock_guard<std::mutex> lock(*reactor->list_mutex);
    244       list_clear(reactor->invalidation_list);
    245     }
    246 
    247     int ret;
    248     OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
    249     if (ret == -1) {
    250       LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__,
    251                 strerror(errno));
    252       reactor->is_running = false;
    253       return REACTOR_STATUS_ERROR;
    254     }
    255 
    256     for (int j = 0; j < ret; ++j) {
    257       // The event file descriptor is the only one that registers with
    258       // a NULL data pointer. We use the NULL to identify it and break
    259       // out of the reactor loop.
    260       if (events[j].data.ptr == NULL) {
    261         eventfd_t value;
    262         eventfd_read(reactor->event_fd, &value);
    263         reactor->is_running = false;
    264         return REACTOR_STATUS_STOP;
    265       }
    266 
    267       reactor_object_t* object = (reactor_object_t*)events[j].data.ptr;
    268 
    269       std::unique_lock<std::mutex> lock(*reactor->list_mutex);
    270       if (list_contains(reactor->invalidation_list, object)) {
    271         continue;
    272       }
    273 
    274       // Downgrade the list lock to an object lock.
    275       {
    276         std::lock_guard<std::mutex> obj_lock(*object->mutex);
    277         lock.unlock();
    278 
    279         reactor->object_removed = false;
    280         if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
    281             object->read_ready)
    282           object->read_ready(object->context);
    283         if (!reactor->object_removed && events[j].events & EPOLLOUT &&
    284             object->write_ready)
    285           object->write_ready(object->context);
    286       }
    287 
    288       if (reactor->object_removed) {
    289         delete object->mutex;
    290         osi_free(object);
    291       }
    292     }
    293   }
    294 
    295   reactor->is_running = false;
    296   return REACTOR_STATUS_DONE;
    297 }
    298