1 #include "uds/service_dispatcher.h" 2 3 #include <errno.h> 4 #include <log/log.h> 5 #include <sys/epoll.h> 6 #include <sys/eventfd.h> 7 8 #include "pdx/service.h" 9 #include "uds/service_endpoint.h" 10 11 static const int kMaxEventsPerLoop = 128; 12 13 namespace android { 14 namespace pdx { 15 namespace uds { 16 17 std::unique_ptr<pdx::ServiceDispatcher> ServiceDispatcher::Create() { 18 std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()}; 19 if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) { 20 dispatcher.reset(); 21 } 22 23 return std::move(dispatcher); 24 } 25 26 ServiceDispatcher::ServiceDispatcher() { 27 event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); 28 if (!event_fd_) { 29 ALOGE("Failed to create event fd because: %s\n", strerror(errno)); 30 return; 31 } 32 33 epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); 34 if (!epoll_fd_) { 35 ALOGE("Failed to create epoll fd because: %s\n", strerror(errno)); 36 return; 37 } 38 39 // Use "this" as a unique pointer to distinguish the event fd from all 40 // the other entries that point to instances of Service. 41 epoll_event event; 42 event.events = EPOLLIN; 43 event.data.ptr = this; 44 45 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) { 46 ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno)); 47 48 // Close the fds here and signal failure to the factory method. 49 event_fd_.Close(); 50 epoll_fd_.Close(); 51 } 52 } 53 54 ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); } 55 56 int ServiceDispatcher::ThreadEnter() { 57 std::lock_guard<std::mutex> autolock(mutex_); 58 59 if (canceled_) 60 return -EBUSY; 61 62 thread_count_++; 63 return 0; 64 } 65 66 void ServiceDispatcher::ThreadExit() { 67 std::lock_guard<std::mutex> autolock(mutex_); 68 thread_count_--; 69 condition_.notify_one(); 70 } 71 72 int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) { 73 if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag) 74 return -EINVAL; 75 76 std::lock_guard<std::mutex> autolock(mutex_); 77 78 auto* endpoint = static_cast<Endpoint*>(service->endpoint()); 79 epoll_event event; 80 event.events = EPOLLIN; 81 event.data.ptr = service.get(); 82 83 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, endpoint->epoll_fd(), &event) < 84 0) { 85 ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno)); 86 return -errno; 87 } 88 89 services_.push_back(service); 90 return 0; 91 } 92 93 int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) { 94 if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag) 95 return -EINVAL; 96 97 std::lock_guard<std::mutex> autolock(mutex_); 98 99 // It's dangerous to remove a service while other threads may be using it. 100 if (thread_count_ > 0) 101 return -EBUSY; 102 103 epoll_event dummy; // See BUGS in man 2 epoll_ctl. 104 105 auto* endpoint = static_cast<Endpoint*>(service->endpoint()); 106 if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, endpoint->epoll_fd(), &dummy) < 107 0) { 108 ALOGE("Failed to remove service from dispatcher because: %s\n", 109 strerror(errno)); 110 return -errno; 111 } 112 113 services_.remove(service); 114 return 0; 115 } 116 117 int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); } 118 119 int ServiceDispatcher::ReceiveAndDispatch(int timeout) { 120 int ret = ThreadEnter(); 121 if (ret < 0) 122 return ret; 123 124 epoll_event events[kMaxEventsPerLoop]; 125 126 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout); 127 if (count <= 0) { 128 ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n", 129 strerror(errno)); 130 ThreadExit(); 131 return count < 0 ? -errno : -ETIMEDOUT; 132 } 133 134 for (int i = 0; i < count; i++) { 135 if (events[i].data.ptr == this) { 136 ThreadExit(); 137 return -EBUSY; 138 } else { 139 Service* service = static_cast<Service*>(events[i].data.ptr); 140 141 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n", 142 static_cast<Endpoint*>(service->endpoint())->epoll_fd()); 143 service->ReceiveAndDispatch(); 144 } 145 } 146 147 ThreadExit(); 148 return 0; 149 } 150 151 int ServiceDispatcher::EnterDispatchLoop() { 152 int ret = ThreadEnter(); 153 if (ret < 0) 154 return ret; 155 156 epoll_event events[kMaxEventsPerLoop]; 157 158 while (!IsCanceled()) { 159 int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1); 160 if (count < 0 && errno != EINTR) { 161 ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno)); 162 ThreadExit(); 163 return -errno; 164 } 165 166 for (int i = 0; i < count; i++) { 167 if (events[i].data.ptr == this) { 168 ThreadExit(); 169 return -EBUSY; 170 } else { 171 Service* service = static_cast<Service*>(events[i].data.ptr); 172 173 ALOGI_IF(TRACE, "Dispatching message: fd=%d\n", 174 static_cast<Endpoint*>(service->endpoint())->epoll_fd()); 175 service->ReceiveAndDispatch(); 176 } 177 } 178 } 179 180 ThreadExit(); 181 return 0; 182 } 183 184 void ServiceDispatcher::SetCanceled(bool cancel) { 185 std::unique_lock<std::mutex> lock(mutex_); 186 canceled_ = cancel; 187 188 if (canceled_ && thread_count_ > 0) { 189 eventfd_write(event_fd_.Get(), 1); // Signal threads to quit. 190 191 condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); }); 192 193 eventfd_t value; 194 eventfd_read(event_fd_.Get(), &value); // Unsignal. 195 } 196 } 197 198 bool ServiceDispatcher::IsCanceled() const { return canceled_; } 199 200 } // namespace uds 201 } // namespace pdx 202 } // namespace android 203