1 /* http://frotznet.googlecode.com/svn/trunk/utils/fdevent.c 2 ** 3 ** Copyright 2006, Brian Swetland <swetland (at) frotz.net> 4 ** 5 ** Licensed under the Apache License, Version 2.0 (the "License"); 6 ** you may not use this file except in compliance with the License. 7 ** You may obtain a copy of the License at 8 ** 9 ** http://www.apache.org/licenses/LICENSE-2.0 10 ** 11 ** Unless required by applicable law or agreed to in writing, software 12 ** distributed under the License is distributed on an "AS IS" BASIS, 13 ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 ** See the License for the specific language governing permissions and 15 ** limitations under the License. 16 */ 17 18 #define TRACE_TAG FDEVENT 19 20 #include "sysdeps.h" 21 #include "fdevent.h" 22 23 #include <fcntl.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <unistd.h> 27 28 #include <atomic> 29 #include <deque> 30 #include <functional> 31 #include <list> 32 #include <mutex> 33 #include <unordered_map> 34 #include <vector> 35 36 #include <android-base/logging.h> 37 #include <android-base/stringprintf.h> 38 #include <android-base/thread_annotations.h> 39 40 #include "adb_io.h" 41 #include "adb_trace.h" 42 #include "adb_unique_fd.h" 43 #include "adb_utils.h" 44 45 #define FDE_EVENTMASK 0x00ff 46 #define FDE_STATEMASK 0xff00 47 48 #define FDE_ACTIVE 0x0100 49 #define FDE_PENDING 0x0200 50 #define FDE_CREATED 0x0400 51 52 struct PollNode { 53 fdevent* fde; 54 adb_pollfd pollfd; 55 56 explicit PollNode(fdevent* fde) : fde(fde) { 57 memset(&pollfd, 0, sizeof(pollfd)); 58 pollfd.fd = fde->fd; 59 60 #if defined(__linux__) 61 // Always enable POLLRDHUP, so the host server can take action when some clients disconnect. 62 // Then we can avoid leaving many sockets in CLOSE_WAIT state. See http://b/23314034. 63 pollfd.events = POLLRDHUP; 64 #endif 65 } 66 }; 67 68 // All operations to fdevent should happen only in the main thread. 69 // That's why we don't need a lock for fdevent. 70 static auto& g_poll_node_map = *new std::unordered_map<int, PollNode>(); 71 static auto& g_pending_list = *new std::list<fdevent*>(); 72 static std::atomic<bool> terminate_loop(false); 73 static bool main_thread_valid; 74 static unsigned long main_thread_id; 75 76 static auto& run_queue_notify_fd = *new unique_fd(); 77 static auto& run_queue_mutex = *new std::mutex(); 78 static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque<std::function<void()>>(); 79 80 void check_main_thread() { 81 if (main_thread_valid) { 82 CHECK_EQ(main_thread_id, adb_thread_id()); 83 } 84 } 85 86 void set_main_thread() { 87 main_thread_valid = true; 88 main_thread_id = adb_thread_id(); 89 } 90 91 static std::string dump_fde(const fdevent* fde) { 92 std::string state; 93 if (fde->state & FDE_ACTIVE) { 94 state += "A"; 95 } 96 if (fde->state & FDE_PENDING) { 97 state += "P"; 98 } 99 if (fde->state & FDE_CREATED) { 100 state += "C"; 101 } 102 if (fde->state & FDE_READ) { 103 state += "R"; 104 } 105 if (fde->state & FDE_WRITE) { 106 state += "W"; 107 } 108 if (fde->state & FDE_ERROR) { 109 state += "E"; 110 } 111 if (fde->state & FDE_DONT_CLOSE) { 112 state += "D"; 113 } 114 return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str()); 115 } 116 117 fdevent* fdevent_create(int fd, fd_func func, void* arg) { 118 check_main_thread(); 119 fdevent *fde = (fdevent*) malloc(sizeof(fdevent)); 120 if(fde == 0) return 0; 121 fdevent_install(fde, fd, func, arg); 122 fde->state |= FDE_CREATED; 123 return fde; 124 } 125 126 void fdevent_destroy(fdevent* fde) { 127 check_main_thread(); 128 if(fde == 0) return; 129 if(!(fde->state & FDE_CREATED)) { 130 LOG(FATAL) << "destroying fde not created by fdevent_create(): " << dump_fde(fde); 131 } 132 fdevent_remove(fde); 133 free(fde); 134 } 135 136 void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) { 137 check_main_thread(); 138 CHECK_GE(fd, 0); 139 memset(fde, 0, sizeof(fdevent)); 140 fde->state = FDE_ACTIVE; 141 fde->fd = fd; 142 fde->func = func; 143 fde->arg = arg; 144 if (!set_file_block_mode(fd, false)) { 145 // Here is not proper to handle the error. If it fails here, some error is 146 // likely to be detected by poll(), then we can let the callback function 147 // to handle it. 148 LOG(ERROR) << "failed to set non-blocking mode for fd " << fd; 149 } 150 auto pair = g_poll_node_map.emplace(fde->fd, PollNode(fde)); 151 CHECK(pair.second) << "install existing fd " << fd; 152 D("fdevent_install %s", dump_fde(fde).c_str()); 153 } 154 155 void fdevent_remove(fdevent* fde) { 156 check_main_thread(); 157 D("fdevent_remove %s", dump_fde(fde).c_str()); 158 if (fde->state & FDE_ACTIVE) { 159 g_poll_node_map.erase(fde->fd); 160 if (fde->state & FDE_PENDING) { 161 g_pending_list.remove(fde); 162 } 163 if (!(fde->state & FDE_DONT_CLOSE)) { 164 adb_close(fde->fd); 165 fde->fd = -1; 166 } 167 fde->state = 0; 168 fde->events = 0; 169 } 170 } 171 172 static void fdevent_update(fdevent* fde, unsigned events) { 173 auto it = g_poll_node_map.find(fde->fd); 174 CHECK(it != g_poll_node_map.end()); 175 PollNode& node = it->second; 176 if (events & FDE_READ) { 177 node.pollfd.events |= POLLIN; 178 } else { 179 node.pollfd.events &= ~POLLIN; 180 } 181 182 if (events & FDE_WRITE) { 183 node.pollfd.events |= POLLOUT; 184 } else { 185 node.pollfd.events &= ~POLLOUT; 186 } 187 fde->state = (fde->state & FDE_STATEMASK) | events; 188 } 189 190 void fdevent_set(fdevent* fde, unsigned events) { 191 check_main_thread(); 192 events &= FDE_EVENTMASK; 193 if ((fde->state & FDE_EVENTMASK) == events) { 194 return; 195 } 196 CHECK(fde->state & FDE_ACTIVE); 197 fdevent_update(fde, events); 198 D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events); 199 200 if (fde->state & FDE_PENDING) { 201 // If we are pending, make sure we don't signal an event that is no longer wanted. 202 fde->events &= events; 203 if (fde->events == 0) { 204 g_pending_list.remove(fde); 205 fde->state &= ~FDE_PENDING; 206 } 207 } 208 } 209 210 void fdevent_add(fdevent* fde, unsigned events) { 211 check_main_thread(); 212 fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); 213 } 214 215 void fdevent_del(fdevent* fde, unsigned events) { 216 check_main_thread(); 217 fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); 218 } 219 220 static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { 221 std::string result; 222 for (const auto& pollfd : pollfds) { 223 std::string op; 224 if (pollfd.events & POLLIN) { 225 op += "R"; 226 } 227 if (pollfd.events & POLLOUT) { 228 op += "W"; 229 } 230 android::base::StringAppendF(&result, " %d(%s)", pollfd.fd, op.c_str()); 231 } 232 return result; 233 } 234 235 static void fdevent_process() { 236 std::vector<adb_pollfd> pollfds; 237 for (const auto& pair : g_poll_node_map) { 238 pollfds.push_back(pair.second.pollfd); 239 } 240 CHECK_GT(pollfds.size(), 0u); 241 D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); 242 int ret = adb_poll(&pollfds[0], pollfds.size(), -1); 243 if (ret == -1) { 244 PLOG(ERROR) << "poll(), ret = " << ret; 245 return; 246 } 247 for (const auto& pollfd : pollfds) { 248 if (pollfd.revents != 0) { 249 D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); 250 } 251 unsigned events = 0; 252 if (pollfd.revents & POLLIN) { 253 events |= FDE_READ; 254 } 255 if (pollfd.revents & POLLOUT) { 256 events |= FDE_WRITE; 257 } 258 if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { 259 // We fake a read, as the rest of the code assumes that errors will 260 // be detected at that point. 261 events |= FDE_READ | FDE_ERROR; 262 } 263 #if defined(__linux__) 264 if (pollfd.revents & POLLRDHUP) { 265 events |= FDE_READ | FDE_ERROR; 266 } 267 #endif 268 if (events != 0) { 269 auto it = g_poll_node_map.find(pollfd.fd); 270 CHECK(it != g_poll_node_map.end()); 271 fdevent* fde = it->second.fde; 272 CHECK_EQ(fde->fd, pollfd.fd); 273 fde->events |= events; 274 D("%s got events %x", dump_fde(fde).c_str(), events); 275 fde->state |= FDE_PENDING; 276 g_pending_list.push_back(fde); 277 } 278 } 279 } 280 281 static void fdevent_call_fdfunc(fdevent* fde) { 282 unsigned events = fde->events; 283 fde->events = 0; 284 CHECK(fde->state & FDE_PENDING); 285 fde->state &= (~FDE_PENDING); 286 D("fdevent_call_fdfunc %s", dump_fde(fde).c_str()); 287 fde->func(fde->fd, events, fde->arg); 288 } 289 290 static void fdevent_run_flush() EXCLUDES(run_queue_mutex) { 291 // We need to be careful around reentrancy here, since a function we call can queue up another 292 // function. 293 while (true) { 294 std::function<void()> fn; 295 { 296 std::lock_guard<std::mutex> lock(run_queue_mutex); 297 if (run_queue.empty()) { 298 break; 299 } 300 fn = run_queue.front(); 301 run_queue.pop_front(); 302 } 303 fn(); 304 } 305 } 306 307 static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) { 308 CHECK_GE(fd, 0); 309 CHECK(ev & FDE_READ); 310 311 char buf[1024]; 312 313 // Empty the fd. 314 if (adb_read(fd, buf, sizeof(buf)) == -1) { 315 PLOG(FATAL) << "failed to empty run queue notify fd"; 316 } 317 318 fdevent_run_flush(); 319 } 320 321 static void fdevent_run_setup() { 322 { 323 std::lock_guard<std::mutex> lock(run_queue_mutex); 324 CHECK(run_queue_notify_fd.get() == -1); 325 int s[2]; 326 if (adb_socketpair(s) != 0) { 327 PLOG(FATAL) << "failed to create run queue notify socketpair"; 328 } 329 330 if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { 331 PLOG(FATAL) << "failed to make run queue notify socket nonblocking"; 332 } 333 334 run_queue_notify_fd.reset(s[0]); 335 fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr); 336 CHECK(fde != nullptr); 337 fdevent_add(fde, FDE_READ); 338 } 339 340 fdevent_run_flush(); 341 } 342 343 void fdevent_run_on_main_thread(std::function<void()> fn) { 344 std::lock_guard<std::mutex> lock(run_queue_mutex); 345 run_queue.push_back(std::move(fn)); 346 347 // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. 348 // In that case, rely on the setup code to flush the queue without a notification being needed. 349 if (run_queue_notify_fd != -1) { 350 int rc = adb_write(run_queue_notify_fd.get(), "", 1); 351 352 // It's possible that we get EAGAIN here, if lots of notifications came in while handling. 353 if (rc == 0) { 354 PLOG(FATAL) << "run queue notify fd was closed?"; 355 } else if (rc == -1 && errno != EAGAIN) { 356 PLOG(FATAL) << "failed to write to run queue notify fd"; 357 } 358 } 359 } 360 361 void fdevent_loop() { 362 set_main_thread(); 363 fdevent_run_setup(); 364 365 while (true) { 366 if (terminate_loop) { 367 return; 368 } 369 370 D("--- --- waiting for events"); 371 372 fdevent_process(); 373 374 while (!g_pending_list.empty()) { 375 fdevent* fde = g_pending_list.front(); 376 g_pending_list.pop_front(); 377 fdevent_call_fdfunc(fde); 378 } 379 } 380 } 381 382 void fdevent_terminate_loop() { 383 terminate_loop = true; 384 } 385 386 size_t fdevent_installed_count() { 387 return g_poll_node_map.size(); 388 } 389 390 void fdevent_reset() { 391 g_poll_node_map.clear(); 392 g_pending_list.clear(); 393 394 std::lock_guard<std::mutex> lock(run_queue_mutex); 395 run_queue_notify_fd.reset(); 396 run_queue.clear(); 397 398 main_thread_valid = false; 399 terminate_loop = false; 400 } 401