1 #include <pdx/service_dispatcher.h> 2 3 #include <errno.h> 4 #include <log/log.h> 5 #include <sys/epoll.h> 6 #include <sys/eventfd.h> 7 8 #include <pdx/service.h> 9 #include <pdx/service_endpoint.h> 10 11 static const int kMaxEventsPerLoop = 128; 12 13 namespace android { 14 namespace pdx { 15 16 std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() { 17 std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()}; 18 if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) { 19 dispatcher.reset(); 20 } 21 22 return dispatcher; 23 } 24 25 ServiceDispatcher::ServiceDispatcher() { 26 event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); 27 if (!event_fd_) { 28 ALOGE("Failed to create event fd because: %s\n", strerror(errno)); 29 return; 30 } 31 32 epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); 33 if (!epoll_fd_) { 34 ALOGE("Failed to create epoll fd because: %s\n", strerror(errno)); 35 return; 36 } 37 38 // Use "this" as a unique pointer to distinguish the event fd from all 39 // the other entries that point to instances of Service. 40 epoll_event event; 41 event.events = EPOLLIN; 42 event.data.ptr = this; 43 44 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) { 45 ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno)); 46 47 // Close the fds here and signal failure to the factory method. 48 event_fd_.Close(); 49 epoll_fd_.Close(); 50 } 51 } 52 53 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); } 54 55 int ServiceDispatcher::ThreadEnter() { 56 std::lock_guard<std::mutex> autolock(mutex_); 57 58 if (canceled_) 59 return -EBUSY; 60 61 thread_count_++; 62 return 0; 63 } 64 65 void ServiceDispatcher::ThreadExit() { 66 std::lock_guard<std::mutex> autolock(mutex_); 67 thread_count_--; 68 condition_.notify_one(); 69 } 70 71 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) { 72 std::lock_guard<std::mutex> autolock(mutex_); 73 74 epoll_event event; 75 event.events = EPOLLIN; 76 event.data.ptr = service.get(); 77 78 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(), 79 &event) < 0) { 80 ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno)); 81 return -errno; 82 } 83 84 services_.push_back(service); 85 return 0; 86 } 87 88 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) { 89 std::lock_guard<std::mutex> autolock(mutex_); 90 91 // It's dangerous to remove a service while other threads may be using it. 92 if (thread_count_ > 0) 93 return -EBUSY; 94 95 epoll_event dummy; // See BUGS in man 2 epoll_ctl. 96 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(), 97 &dummy) < 0) { 98 ALOGE("Failed to remove service from dispatcher because: %s\n", 99 strerror(errno)); 100 return -errno; 101 } 102 103 services_.erase(std::remove(services_.begin(), services_.end(), service), 104 services_.end()); 105 return 0; 106 } 107 108 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); } 109 110 int ServiceDispatcher::ReceiveAndDispatch(int timeout) { 111 int ret = ThreadEnter(); 112 if (ret < 0) 113 return ret; 114 115 epoll_event events[kMaxEventsPerLoop]; 116 117 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout); 118 if (count <= 0) { 119 ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n", 120 strerror(errno)); 121 ThreadExit(); 122 return count < 0 ? -errno : -ETIMEDOUT; 123 } 124 125 for (int i = 0; i < count; i++) { 126 if (events[i].data.ptr == this) { 127 ThreadExit(); 128 return -EBUSY; 129 } else { 130 Service* service = static_cast<Service*>(events[i].data.ptr); 131 132 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n", 133 service->endpoint()->epoll_fd()); 134 service->ReceiveAndDispatch(); 135 } 136 } 137 138 ThreadExit(); 139 return 0; 140 } 141 142 int ServiceDispatcher::EnterDispatchLoop() { 143 int ret = ThreadEnter(); 144 if (ret < 0) 145 return ret; 146 147 epoll_event events[kMaxEventsPerLoop]; 148 149 while (!IsCanceled()) { 150 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1); 151 if (count < 0 && errno != EINTR) { 152 ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno)); 153 ThreadExit(); 154 return -errno; 155 } 156 157 for (int i = 0; i < count; i++) { 158 if (events[i].data.ptr == this) { 159 ThreadExit(); 160 return -EBUSY; 161 } else { 162 Service* service = static_cast<Service*>(events[i].data.ptr); 163 164 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n", 165 service->endpoint()->epoll_fd()); 166 service->ReceiveAndDispatch(); 167 } 168 } 169 } 170 171 ThreadExit(); 172 return 0; 173 } 174 175 void ServiceDispatcher::SetCanceled(bool cancel) { 176 std::unique_lock<std::mutex> lock(mutex_); 177 canceled_ = cancel; 178 179 if (canceled_ && thread_count_ > 0) { 180 eventfd_write(event_fd_.Get(), 1); // Signal threads to quit. 181 182 condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); }); 183 184 eventfd_t value; 185 eventfd_read(event_fd_.Get(), &value); // Unsignal. 186 } 187 } 188 189 bool ServiceDispatcher::IsCanceled() const { return canceled_; } 190 191 } // namespace pdx 192 } // namespace android 193