Home | History | Annotate | Download | only in libpdx_uds
      1 #include "uds/service_dispatcher.h"
      2 
      3 #include <errno.h>
      4 #include <log/log.h>
      5 #include <sys/epoll.h>
      6 #include <sys/eventfd.h>
      7 
      8 #include "pdx/service.h"
      9 #include "uds/service_endpoint.h"
     10 
     11 static const int kMaxEventsPerLoop = 128;
     12 
     13 namespace android {
     14 namespace pdx {
     15 namespace uds {
     16 
     17 std::unique_ptr<pdx::ServiceDispatcher> ServiceDispatcher::Create() {
     18   std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
     19   if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
     20     dispatcher.reset();
     21   }
     22 
     23   return std::move(dispatcher);
     24 }
     25 
     26 ServiceDispatcher::ServiceDispatcher() {
     27   event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
     28   if (!event_fd_) {
     29     ALOGE("Failed to create event fd because: %s\n", strerror(errno));
     30     return;
     31   }
     32 
     33   epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
     34   if (!epoll_fd_) {
     35     ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
     36     return;
     37   }
     38 
     39   // Use "this" as a unique pointer to distinguish the event fd from all
     40   // the other entries that point to instances of Service.
     41   epoll_event event;
     42   event.events = EPOLLIN;
     43   event.data.ptr = this;
     44 
     45   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
     46     ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
     47 
     48     // Close the fds here and signal failure to the factory method.
     49     event_fd_.Close();
     50     epoll_fd_.Close();
     51   }
     52 }
     53 
     54 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
     55 
     56 int ServiceDispatcher::ThreadEnter() {
     57   std::lock_guard<std::mutex> autolock(mutex_);
     58 
     59   if (canceled_)
     60     return -EBUSY;
     61 
     62   thread_count_++;
     63   return 0;
     64 }
     65 
     66 void ServiceDispatcher::ThreadExit() {
     67   std::lock_guard<std::mutex> autolock(mutex_);
     68   thread_count_--;
     69   condition_.notify_one();
     70 }
     71 
     72 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
     73   if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
     74     return -EINVAL;
     75 
     76   std::lock_guard<std::mutex> autolock(mutex_);
     77 
     78   auto* endpoint = static_cast<Endpoint*>(service->endpoint());
     79   epoll_event event;
     80   event.events = EPOLLIN;
     81   event.data.ptr = service.get();
     82 
     83   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, endpoint->epoll_fd(), &event) <
     84       0) {
     85     ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
     86     return -errno;
     87   }
     88 
     89   services_.push_back(service);
     90   return 0;
     91 }
     92 
     93 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
     94   if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
     95     return -EINVAL;
     96 
     97   std::lock_guard<std::mutex> autolock(mutex_);
     98 
     99   // It's dangerous to remove a service while other threads may be using it.
    100   if (thread_count_ > 0)
    101     return -EBUSY;
    102 
    103   epoll_event dummy;  // See BUGS in man 2 epoll_ctl.
    104 
    105   auto* endpoint = static_cast<Endpoint*>(service->endpoint());
    106   if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, endpoint->epoll_fd(), &dummy) <
    107       0) {
    108     ALOGE("Failed to remove service from dispatcher because: %s\n",
    109           strerror(errno));
    110     return -errno;
    111   }
    112 
    113   services_.remove(service);
    114   return 0;
    115 }
    116 
    117 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
    118 
    119 int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
    120   int ret = ThreadEnter();
    121   if (ret < 0)
    122     return ret;
    123 
    124   epoll_event events[kMaxEventsPerLoop];
    125 
    126   int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
    127   if (count <= 0) {
    128     ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
    129              strerror(errno));
    130     ThreadExit();
    131     return count < 0 ? -errno : -ETIMEDOUT;
    132   }
    133 
    134   for (int i = 0; i < count; i++) {
    135     if (events[i].data.ptr == this) {
    136       ThreadExit();
    137       return -EBUSY;
    138     } else {
    139       Service* service = static_cast<Service*>(events[i].data.ptr);
    140 
    141       ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
    142                static_cast<Endpoint*>(service->endpoint())->epoll_fd());
    143       service->ReceiveAndDispatch();
    144     }
    145   }
    146 
    147   ThreadExit();
    148   return 0;
    149 }
    150 
    151 int ServiceDispatcher::EnterDispatchLoop() {
    152   int ret = ThreadEnter();
    153   if (ret < 0)
    154     return ret;
    155 
    156   epoll_event events[kMaxEventsPerLoop];
    157 
    158   while (!IsCanceled()) {
    159     int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
    160     if (count < 0 && errno != EINTR) {
    161       ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
    162       ThreadExit();
    163       return -errno;
    164     }
    165 
    166     for (int i = 0; i < count; i++) {
    167       if (events[i].data.ptr == this) {
    168         ThreadExit();
    169         return -EBUSY;
    170       } else {
    171         Service* service = static_cast<Service*>(events[i].data.ptr);
    172 
    173         ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
    174                  static_cast<Endpoint*>(service->endpoint())->epoll_fd());
    175         service->ReceiveAndDispatch();
    176       }
    177     }
    178   }
    179 
    180   ThreadExit();
    181   return 0;
    182 }
    183 
    184 void ServiceDispatcher::SetCanceled(bool cancel) {
    185   std::unique_lock<std::mutex> lock(mutex_);
    186   canceled_ = cancel;
    187 
    188   if (canceled_ && thread_count_ > 0) {
    189     eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit.
    190 
    191     condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
    192 
    193     eventfd_t value;
    194     eventfd_read(event_fd_.Get(), &value);  // Unsignal.
    195   }
    196 }
    197 
    198 bool ServiceDispatcher::IsCanceled() const { return canceled_; }
    199 
    200 }  // namespace uds
    201 }  // namespace pdx
    202 }  // namespace android
    203