Home | History | Annotate | Download | only in src
      1 /******************************************************************************
      2  *
      3  *  Copyright (C) 2014 Google, Inc.
      4  *
      5  *  Licensed under the Apache License, Version 2.0 (the "License");
      6  *  you may not use this file except in compliance with the License.
      7  *  You may obtain a copy of the License at:
      8  *
      9  *  http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  ******************************************************************************/
     18 
     19 #define LOG_TAG "bt_osi_reactor"
     20 
     21 #include "osi/include/reactor.h"
     22 
     23 #include <assert.h>
     24 #include <errno.h>
     25 #include <pthread.h>
     26 #include <stdlib.h>
     27 #include <string.h>
     28 #include <sys/epoll.h>
     29 #include <sys/eventfd.h>
     30 #include <unistd.h>
     31 
     32 #include "osi/include/allocator.h"
     33 #include "osi/include/list.h"
     34 #include "osi/include/log.h"
     35 
     36 #if !defined(EFD_SEMAPHORE)
     37 #  define EFD_SEMAPHORE (1 << 0)
     38 #endif
     39 
     40 struct reactor_t {
     41   int epoll_fd;
     42   int event_fd;
     43   pthread_mutex_t list_lock;  // protects invalidation_list.
     44   list_t *invalidation_list;  // reactor objects that have been unregistered.
     45   pthread_t run_thread;       // the pthread on which reactor_run is executing.
     46   bool is_running;            // indicates whether |run_thread| is valid.
     47   bool object_removed;
     48 };
     49 
     50 struct reactor_object_t {
     51   int fd;                              // the file descriptor to monitor for events.
     52   void *context;                       // a context that's passed back to the *_ready functions.
     53   reactor_t *reactor;                  // the reactor instance this object is registered with.
     54   pthread_mutex_t lock;                // protects the lifetime of this object and all variables.
     55 
     56   void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.
     57   void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
     58 };
     59 
     60 static reactor_status_t run_reactor(reactor_t *reactor, int iterations);
     61 
     62 static const size_t MAX_EVENTS = 64;
     63 static const eventfd_t EVENT_REACTOR_STOP = 1;
     64 
     65 reactor_t *reactor_new(void) {
     66   reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t));
     67 
     68   ret->epoll_fd = INVALID_FD;
     69   ret->event_fd = INVALID_FD;
     70 
     71   ret->epoll_fd = epoll_create(MAX_EVENTS);
     72   if (ret->epoll_fd == INVALID_FD) {
     73     LOG_ERROR(LOG_TAG, "%s unable to create epoll instance: %s", __func__, strerror(errno));
     74     goto error;
     75   }
     76 
     77   ret->event_fd = eventfd(0, 0);
     78   if (ret->event_fd == INVALID_FD) {
     79     LOG_ERROR(LOG_TAG, "%s unable to create eventfd: %s", __func__, strerror(errno));
     80     goto error;
     81   }
     82 
     83   pthread_mutex_init(&ret->list_lock, NULL);
     84   ret->invalidation_list = list_new(NULL);
     85   if (!ret->invalidation_list) {
     86     LOG_ERROR(LOG_TAG, "%s unable to allocate object invalidation list.", __func__);
     87     goto error;
     88   }
     89 
     90   struct epoll_event event;
     91   memset(&event, 0, sizeof(event));
     92   event.events = EPOLLIN;
     93   event.data.ptr = NULL;
     94   if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
     95     LOG_ERROR(LOG_TAG, "%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));
     96     goto error;
     97   }
     98 
     99   return ret;
    100 
    101 error:;
    102   reactor_free(ret);
    103   return NULL;
    104 }
    105 
    106 void reactor_free(reactor_t *reactor) {
    107   if (!reactor)
    108     return;
    109 
    110   list_free(reactor->invalidation_list);
    111   close(reactor->event_fd);
    112   close(reactor->epoll_fd);
    113   osi_free(reactor);
    114 }
    115 
    116 reactor_status_t reactor_start(reactor_t *reactor) {
    117   assert(reactor != NULL);
    118   return run_reactor(reactor, 0);
    119 }
    120 
    121 reactor_status_t reactor_run_once(reactor_t *reactor) {
    122   assert(reactor != NULL);
    123   return run_reactor(reactor, 1);
    124 }
    125 
    126 void reactor_stop(reactor_t *reactor) {
    127   assert(reactor != NULL);
    128 
    129   eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
    130 }
    131 
    132 reactor_object_t *reactor_register(reactor_t *reactor,
    133     int fd, void *context,
    134     void (*read_ready)(void *context),
    135     void (*write_ready)(void *context)) {
    136   assert(reactor != NULL);
    137   assert(fd != INVALID_FD);
    138 
    139   reactor_object_t *object =
    140       (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));
    141 
    142   object->reactor = reactor;
    143   object->fd = fd;
    144   object->context = context;
    145   object->read_ready = read_ready;
    146   object->write_ready = write_ready;
    147   pthread_mutex_init(&object->lock, NULL);
    148 
    149   struct epoll_event event;
    150   memset(&event, 0, sizeof(event));
    151   if (read_ready)
    152     event.events |= (EPOLLIN | EPOLLRDHUP);
    153   if (write_ready)
    154     event.events |= EPOLLOUT;
    155   event.data.ptr = object;
    156 
    157   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
    158     LOG_ERROR(LOG_TAG, "%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));
    159     pthread_mutex_destroy(&object->lock);
    160     osi_free(object);
    161     return NULL;
    162   }
    163 
    164   return object;
    165 }
    166 
    167 bool reactor_change_registration(reactor_object_t *object,
    168     void (*read_ready)(void *context),
    169     void (*write_ready)(void *context)) {
    170   assert(object != NULL);
    171 
    172   struct epoll_event event;
    173   memset(&event, 0, sizeof(event));
    174   if (read_ready)
    175     event.events |= (EPOLLIN | EPOLLRDHUP);
    176   if (write_ready)
    177     event.events |= EPOLLOUT;
    178   event.data.ptr = object;
    179 
    180   if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
    181     LOG_ERROR(LOG_TAG, "%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));
    182     return false;
    183   }
    184 
    185   pthread_mutex_lock(&object->lock);
    186   object->read_ready = read_ready;
    187   object->write_ready = write_ready;
    188   pthread_mutex_unlock(&object->lock);
    189 
    190   return true;
    191 }
    192 
    193 void reactor_unregister(reactor_object_t *obj) {
    194   assert(obj != NULL);
    195 
    196   reactor_t *reactor = obj->reactor;
    197 
    198   if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1)
    199     LOG_ERROR(LOG_TAG, "%s unable to unregister fd %d from epoll set: %s", __func__, obj->fd, strerror(errno));
    200 
    201   if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
    202     reactor->object_removed = true;
    203     return;
    204   }
    205 
    206   pthread_mutex_lock(&reactor->list_lock);
    207   list_append(reactor->invalidation_list, obj);
    208   pthread_mutex_unlock(&reactor->list_lock);
    209 
    210   // Taking the object lock here makes sure a callback for |obj| isn't
    211   // currently executing. The reactor thread must then either be before
    212   // the callbacks or after. If after, we know that the object won't be
    213   // referenced because it has been taken out of the epoll set. If before,
    214   // it won't be referenced because the reactor thread will check the
    215   // invalidation_list and find it in there. So by taking this lock, we
    216   // are waiting until the reactor thread drops all references to |obj|.
    217   // One the wait completes, we can unlock and destroy |obj| safely.
    218   pthread_mutex_lock(&obj->lock);
    219   pthread_mutex_unlock(&obj->lock);
    220   pthread_mutex_destroy(&obj->lock);
    221   osi_free(obj);
    222 }
    223 
    224 // Runs the reactor loop for a maximum of |iterations|.
    225 // 0 |iterations| means loop forever.
    226 // |reactor| may not be NULL.
    227 static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
    228   assert(reactor != NULL);
    229 
    230   reactor->run_thread = pthread_self();
    231   reactor->is_running = true;
    232 
    233   struct epoll_event events[MAX_EVENTS];
    234   for (int i = 0; iterations == 0 || i < iterations; ++i) {
    235     pthread_mutex_lock(&reactor->list_lock);
    236     list_clear(reactor->invalidation_list);
    237     pthread_mutex_unlock(&reactor->list_lock);
    238 
    239     int ret;
    240     OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
    241     if (ret == -1) {
    242       LOG_ERROR(LOG_TAG, "%s error in epoll_wait: %s", __func__, strerror(errno));
    243       reactor->is_running = false;
    244       return REACTOR_STATUS_ERROR;
    245     }
    246 
    247     for (int j = 0; j < ret; ++j) {
    248       // The event file descriptor is the only one that registers with
    249       // a NULL data pointer. We use the NULL to identify it and break
    250       // out of the reactor loop.
    251       if (events[j].data.ptr == NULL) {
    252         eventfd_t value;
    253         eventfd_read(reactor->event_fd, &value);
    254         reactor->is_running = false;
    255         return REACTOR_STATUS_STOP;
    256       }
    257 
    258       reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;
    259 
    260       pthread_mutex_lock(&reactor->list_lock);
    261       if (list_contains(reactor->invalidation_list, object)) {
    262         pthread_mutex_unlock(&reactor->list_lock);
    263         continue;
    264       }
    265 
    266       // Downgrade the list lock to an object lock.
    267       pthread_mutex_lock(&object->lock);
    268       pthread_mutex_unlock(&reactor->list_lock);
    269 
    270       reactor->object_removed = false;
    271       if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
    272         object->read_ready(object->context);
    273       if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
    274         object->write_ready(object->context);
    275       pthread_mutex_unlock(&object->lock);
    276 
    277       if (reactor->object_removed) {
    278         pthread_mutex_destroy(&object->lock);
    279         osi_free(object);
    280       }
    281     }
    282   }
    283 
    284   reactor->is_running = false;
    285   return REACTOR_STATUS_DONE;
    286 }
    287