Home | History | Annotate | Download | only in libpdx
      1 #include <pdx/service_dispatcher.h>
      2 
      3 #include <errno.h>
      4 #include <log/log.h>
      5 #include <sys/epoll.h>
      6 #include <sys/eventfd.h>
      7 
      8 #include <pdx/service.h>
      9 #include <pdx/service_endpoint.h>
     10 
     11 static const int kMaxEventsPerLoop = 128;
     12 
     13 namespace android {
     14 namespace pdx {
     15 
     16 std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() {
     17   std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
     18   if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
     19     dispatcher.reset();
     20   }
     21 
     22   return dispatcher;
     23 }
     24 
     25 ServiceDispatcher::ServiceDispatcher() {
     26   event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
     27   if (!event_fd_) {
     28     ALOGE("Failed to create event fd because: %s\n", strerror(errno));
     29     return;
     30   }
     31 
     32   epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
     33   if (!epoll_fd_) {
     34     ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
     35     return;
     36   }
     37 
     38   // Use "this" as a unique pointer to distinguish the event fd from all
     39   // the other entries that point to instances of Service.
     40   epoll_event event;
     41   event.events = EPOLLIN;
     42   event.data.ptr = this;
     43 
     44   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
     45     ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
     46 
     47     // Close the fds here and signal failure to the factory method.
     48     event_fd_.Close();
     49     epoll_fd_.Close();
     50   }
     51 }
     52 
     53 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
     54 
     55 int ServiceDispatcher::ThreadEnter() {
     56   std::lock_guard<std::mutex> autolock(mutex_);
     57 
     58   if (canceled_)
     59     return -EBUSY;
     60 
     61   thread_count_++;
     62   return 0;
     63 }
     64 
     65 void ServiceDispatcher::ThreadExit() {
     66   std::lock_guard<std::mutex> autolock(mutex_);
     67   thread_count_--;
     68   condition_.notify_one();
     69 }
     70 
     71 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
     72   std::lock_guard<std::mutex> autolock(mutex_);
     73 
     74   epoll_event event;
     75   event.events = EPOLLIN;
     76   event.data.ptr = service.get();
     77 
     78   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(),
     79                 &event) < 0) {
     80     ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
     81     return -errno;
     82   }
     83 
     84   services_.push_back(service);
     85   return 0;
     86 }
     87 
     88 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
     89   std::lock_guard<std::mutex> autolock(mutex_);
     90 
     91   // It's dangerous to remove a service while other threads may be using it.
     92   if (thread_count_ > 0)
     93     return -EBUSY;
     94 
     95   epoll_event dummy;  // See BUGS in man 2 epoll_ctl.
     96   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(),
     97                 &dummy) < 0) {
     98     ALOGE("Failed to remove service from dispatcher because: %s\n",
     99           strerror(errno));
    100     return -errno;
    101   }
    102 
    103   services_.erase(std::remove(services_.begin(), services_.end(), service),
    104                   services_.end());
    105   return 0;
    106 }
    107 
    108 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
    109 
    110 int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
    111   int ret = ThreadEnter();
    112   if (ret < 0)
    113     return ret;
    114 
    115   epoll_event events[kMaxEventsPerLoop];
    116 
    117   int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
    118   if (count <= 0) {
    119     ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
    120              strerror(errno));
    121     ThreadExit();
    122     return count < 0 ? -errno : -ETIMEDOUT;
    123   }
    124 
    125   for (int i = 0; i < count; i++) {
    126     if (events[i].data.ptr == this) {
    127       ThreadExit();
    128       return -EBUSY;
    129     } else {
    130       Service* service = static_cast<Service*>(events[i].data.ptr);
    131 
    132       ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
    133                service->endpoint()->epoll_fd());
    134       service->ReceiveAndDispatch();
    135     }
    136   }
    137 
    138   ThreadExit();
    139   return 0;
    140 }
    141 
    142 int ServiceDispatcher::EnterDispatchLoop() {
    143   int ret = ThreadEnter();
    144   if (ret < 0)
    145     return ret;
    146 
    147   epoll_event events[kMaxEventsPerLoop];
    148 
    149   while (!IsCanceled()) {
    150     int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
    151     if (count < 0 && errno != EINTR) {
    152       ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
    153       ThreadExit();
    154       return -errno;
    155     }
    156 
    157     for (int i = 0; i < count; i++) {
    158       if (events[i].data.ptr == this) {
    159         ThreadExit();
    160         return -EBUSY;
    161       } else {
    162         Service* service = static_cast<Service*>(events[i].data.ptr);
    163 
    164         ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
    165                  service->endpoint()->epoll_fd());
    166         service->ReceiveAndDispatch();
    167       }
    168     }
    169   }
    170 
    171   ThreadExit();
    172   return 0;
    173 }
    174 
    175 void ServiceDispatcher::SetCanceled(bool cancel) {
    176   std::unique_lock<std::mutex> lock(mutex_);
    177   canceled_ = cancel;
    178 
    179   if (canceled_ && thread_count_ > 0) {
    180     eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit.
    181 
    182     condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
    183 
    184     eventfd_t value;
    185     eventfd_read(event_fd_.Get(), &value);  // Unsignal.
    186   }
    187 }
    188 
    189 bool ServiceDispatcher::IsCanceled() const { return canceled_; }
    190 
    191 }  // namespace pdx
    192 }  // namespace android
    193